1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 using System.Collections.Generic; 4 using System.Reactive.Concurrency; 5 using System.Threading; 6 7 #if !NO_TPL 8 using System.Threading.Tasks; 9 #endif 10 11 namespace System.Reactive.Linq 12 { 13 public static partial class Observable 14 { 15 #region + ForEachAsync + 16 17 #if !NO_TPL 18 /// <summary> 19 /// Invokes an action for each element in the observable sequence, and returns a Task object that will get signaled when the sequence terminates. 20 /// </summary> 21 /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> 22 /// <param name="source">Source sequence.</param> 23 /// <param name="onNext">Action to invoke for each element in the observable sequence.</param> 24 /// <returns>Task that signals the termination of the sequence.</returns> 25 /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception> 26 /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks> ForEachAsync(this IObservable<TSource> source, Action<TSource> onNext)27 public static Task ForEachAsync<TSource>(this IObservable<TSource> source, Action<TSource> onNext) 28 { 29 if (source == null) 30 throw new ArgumentNullException("source"); 31 if (onNext == null) 32 throw new ArgumentNullException("onNext"); 33 34 return s_impl.ForEachAsync<TSource>(source, onNext); 35 } 36 37 /// <summary> 38 /// Invokes an action for each element in the observable sequence, and returns a Task object that will get signaled when the sequence terminates. 39 /// The loop can be quit prematurely by setting the specified cancellation token. 40 /// </summary> 41 /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> 42 /// <param name="source">Source sequence.</param> 43 /// <param name="onNext">Action to invoke for each element in the observable sequence.</param> 44 /// <param name="cancellationToken">Cancellation token used to stop the loop.</param> 45 /// <returns>Task that signals the termination of the sequence.</returns> 46 /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception> 47 /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks> ForEachAsync(this IObservable<TSource> source, Action<TSource> onNext, CancellationToken cancellationToken)48 public static Task ForEachAsync<TSource>(this IObservable<TSource> source, Action<TSource> onNext, CancellationToken cancellationToken) 49 { 50 if (source == null) 51 throw new ArgumentNullException("source"); 52 if (onNext == null) 53 throw new ArgumentNullException("onNext"); 54 55 return s_impl.ForEachAsync<TSource>(source, onNext, cancellationToken); 56 } 57 58 /// <summary> 59 /// Invokes an action for each element in the observable sequence, incorporating the element's index, and returns a Task object that will get signaled when the sequence terminates. 60 /// </summary> 61 /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> 62 /// <param name="source">Source sequence.</param> 63 /// <param name="onNext">Action to invoke for each element in the observable sequence.</param> 64 /// <returns>Task that signals the termination of the sequence.</returns> 65 /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception> 66 /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks> ForEachAsync(this IObservable<TSource> source, Action<TSource, int> onNext)67 public static Task ForEachAsync<TSource>(this IObservable<TSource> source, Action<TSource, int> onNext) 68 { 69 if (source == null) 70 throw new ArgumentNullException("source"); 71 if (onNext == null) 72 throw new ArgumentNullException("onNext"); 73 74 return s_impl.ForEachAsync<TSource>(source, onNext); 75 } 76 77 /// <summary> 78 /// Invokes an action for each element in the observable sequence, incorporating the element's index, and returns a Task object that will get signaled when the sequence terminates. 79 /// The loop can be quit prematurely by setting the specified cancellation token. 80 /// </summary> 81 /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> 82 /// <param name="source">Source sequence.</param> 83 /// <param name="onNext">Action to invoke for each element in the observable sequence.</param> 84 /// <param name="cancellationToken">Cancellation token used to stop the loop.</param> 85 /// <returns>Task that signals the termination of the sequence.</returns> 86 /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception> 87 /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks> ForEachAsync(this IObservable<TSource> source, Action<TSource, int> onNext, CancellationToken cancellationToken)88 public static Task ForEachAsync<TSource>(this IObservable<TSource> source, Action<TSource, int> onNext, CancellationToken cancellationToken) 89 { 90 if (source == null) 91 throw new ArgumentNullException("source"); 92 if (onNext == null) 93 throw new ArgumentNullException("onNext"); 94 95 return s_impl.ForEachAsync<TSource>(source, onNext, cancellationToken); 96 } 97 #endif 98 99 #endregion 100 101 #region + Case + 102 103 /// <summary> 104 /// Uses <paramref name="selector"/> to determine which source in <paramref name="sources"/> to return, choosing <paramref name="defaultSource"/> if no match is found. 105 /// </summary> 106 /// <typeparam name="TValue">The type of the value returned by the selector function, used to look up the resulting source.</typeparam> 107 /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam> 108 /// <param name="selector">Selector function invoked to determine the source to lookup in the <paramref name="sources"/> dictionary.</param> 109 /// <param name="sources">Dictionary of sources to select from based on the <paramref name="selector"/> invocation result.</param> 110 /// <param name="defaultSource">Default source to select in case no matching source in <paramref name="sources"/> is found.</param> 111 /// <returns>The observable sequence retrieved from the <paramref name="sources"/> dictionary based on the <paramref name="selector"/> invocation result, or <paramref name="defaultSource"/> if no match is found.</returns> 112 /// <exception cref="ArgumentNullException"><paramref name="selector"/> or <paramref name="sources"/> or <paramref name="defaultSource"/> is null.</exception> Case(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources, IObservable<TResult> defaultSource)113 public static IObservable<TResult> Case<TValue, TResult>(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources, IObservable<TResult> defaultSource) 114 { 115 if (selector == null) 116 throw new ArgumentNullException("selector"); 117 if (sources == null) 118 throw new ArgumentNullException("sources"); 119 if (defaultSource == null) 120 throw new ArgumentNullException("defaultSource"); 121 122 return s_impl.Case<TValue, TResult>(selector, sources, defaultSource); 123 } 124 125 /// <summary> 126 /// Uses <paramref name="selector"/> to determine which source in <paramref name="sources"/> to return, choosing an empty sequence on the specified scheduler if no match is found. 127 /// </summary> 128 /// <typeparam name="TValue">The type of the value returned by the selector function, used to look up the resulting source.</typeparam> 129 /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam> 130 /// <param name="selector">Selector function invoked to determine the source to lookup in the <paramref name="sources"/> dictionary.</param> 131 /// <param name="sources">Dictionary of sources to select from based on the <paramref name="selector"/> invocation result.</param> 132 /// <param name="scheduler">Scheduler to generate an empty sequence on in case no matching source in <paramref name="sources"/> is found.</param> 133 /// <returns>The observable sequence retrieved from the <paramref name="sources"/> dictionary based on the <paramref name="selector"/> invocation result, or an empty sequence if no match is found.</returns> 134 /// <exception cref="ArgumentNullException"><paramref name="selector"/> or <paramref name="sources"/> or <paramref name="scheduler"/> is null.</exception> Case(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources, IScheduler scheduler)135 public static IObservable<TResult> Case<TValue, TResult>(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources, IScheduler scheduler) 136 { 137 if (selector == null) 138 throw new ArgumentNullException("selector"); 139 if (sources == null) 140 throw new ArgumentNullException("sources"); 141 if (scheduler == null) 142 throw new ArgumentNullException("scheduler"); 143 144 return s_impl.Case<TValue, TResult>(selector, sources, scheduler); 145 } 146 147 /// <summary> 148 /// Uses <paramref name="selector"/> to determine which source in <paramref name="sources"/> to return, choosing an empty sequence if no match is found. 149 /// </summary> 150 /// <typeparam name="TValue">The type of the value returned by the selector function, used to look up the resulting source.</typeparam> 151 /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam> 152 /// <param name="selector">Selector function invoked to determine the source to lookup in the <paramref name="sources"/> dictionary.</param> 153 /// <param name="sources">Dictionary of sources to select from based on the <paramref name="selector"/> invocation result.</param> 154 /// <returns>The observable sequence retrieved from the <paramref name="sources"/> dictionary based on the <paramref name="selector"/> invocation result, or an empty sequence if no match is found.</returns> 155 /// <exception cref="ArgumentNullException"><paramref name="selector"/> or <paramref name="sources"/> is null.</exception> Case(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources)156 public static IObservable<TResult> Case<TValue, TResult>(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources) 157 { 158 if (selector == null) 159 throw new ArgumentNullException("selector"); 160 if (sources == null) 161 throw new ArgumentNullException("sources"); 162 163 return s_impl.Case<TValue, TResult>(selector, sources); 164 } 165 166 #endregion 167 168 #region + DoWhile + 169 170 /// <summary> 171 /// Repeats the given <paramref name="source"/> as long as the specified <paramref name="condition"/> holds, where the <paramref name="condition"/> is evaluated after each repeated <paramref name="source"/> completed. 172 /// </summary> 173 /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> 174 /// <param name="source">Source to repeat as long as the <paramref name="condition"/> function evaluates to true.</param> 175 /// <param name="condition">Condition that will be evaluated upon the completion of an iteration through the <paramref name="source"/>, to determine whether repetition of the source is required.</param> 176 /// <returns>The observable sequence obtained by concatenating the <paramref name="source"/> sequence as long as the <paramref name="condition"/> holds.</returns> 177 /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="condition"/> is null.</exception> DoWhile(this IObservable<TSource> source, Func<bool> condition)178 public static IObservable<TSource> DoWhile<TSource>(this IObservable<TSource> source, Func<bool> condition) 179 { 180 if (source == null) 181 throw new ArgumentNullException("source"); 182 if (condition == null) 183 throw new ArgumentNullException("condition"); 184 185 return s_impl.DoWhile<TSource>(source, condition); 186 } 187 188 #endregion 189 190 #region + For + 191 192 /// <summary> 193 /// Concatenates the observable sequences obtained by running the <paramref name="resultSelector"/> for each element in the given enumerable <paramref name="source"/>. 194 /// </summary> 195 /// <typeparam name="TSource">The type of the elements in the enumerable source sequence.</typeparam> 196 /// <typeparam name="TResult">The type of the elements in the observable result sequence.</typeparam> 197 /// <param name="source">Enumerable source for which each element will be mapped onto an observable source that will be concatenated in the result sequence.</param> 198 /// <param name="resultSelector">Function to select an observable source for each element in the <paramref name="source"/>.</param> 199 /// <returns>The observable sequence obtained by concatenating the sources returned by <paramref name="resultSelector"/> for each element in the <paramref name="source"/>.</returns> 200 /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="resultSelector"/> is null.</exception> For(IEnumerable<TSource> source, Func<TSource, IObservable<TResult>> resultSelector)201 public static IObservable<TResult> For<TSource, TResult>(IEnumerable<TSource> source, Func<TSource, IObservable<TResult>> resultSelector) 202 { 203 if (source == null) 204 throw new ArgumentNullException("source"); 205 if (resultSelector == null) 206 throw new ArgumentNullException("resultSelector"); 207 208 return s_impl.For<TSource, TResult>(source, resultSelector); 209 } 210 211 #endregion 212 213 #region + If + 214 215 /// <summary> 216 /// If the specified <paramref name="condition"/> evaluates true, select the <paramref name="thenSource"/> sequence. Otherwise, select the <paramref name="elseSource"/> sequence. 217 /// </summary> 218 /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam> 219 /// <param name="condition">Condition evaluated to decide which sequence to return.</param> 220 /// <param name="thenSource">Sequence returned in case <paramref name="condition"/> evaluates true.</param> 221 /// <param name="elseSource">Sequence returned in case <paramref name="condition"/> evaluates false.</param> 222 /// <returns><paramref name="thenSource"/> if <paramref name="condition"/> evaluates true; <paramref name="elseSource"/> otherwise.</returns> 223 /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="thenSource"/> or <paramref name="elseSource"/> is null.</exception> If(Func<bool> condition, IObservable<TResult> thenSource, IObservable<TResult> elseSource)224 public static IObservable<TResult> If<TResult>(Func<bool> condition, IObservable<TResult> thenSource, IObservable<TResult> elseSource) 225 { 226 if (condition == null) 227 throw new ArgumentNullException("condition"); 228 if (thenSource == null) 229 throw new ArgumentNullException("thenSource"); 230 if (elseSource == null) 231 throw new ArgumentNullException("elseSource"); 232 233 return s_impl.If<TResult>(condition, thenSource, elseSource); 234 } 235 236 /// <summary> 237 /// If the specified <paramref name="condition"/> evaluates true, select the <paramref name="thenSource"/> sequence. Otherwise, return an empty sequence. 238 /// </summary> 239 /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam> 240 /// <param name="condition">Condition evaluated to decide which sequence to return.</param> 241 /// <param name="thenSource">Sequence returned in case <paramref name="condition"/> evaluates true.</param> 242 /// <returns><paramref name="thenSource"/> if <paramref name="condition"/> evaluates true; an empty sequence otherwise.</returns> 243 /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="thenSource"/> is null.</exception> If(Func<bool> condition, IObservable<TResult> thenSource)244 public static IObservable<TResult> If<TResult>(Func<bool> condition, IObservable<TResult> thenSource) 245 { 246 if (condition == null) 247 throw new ArgumentNullException("condition"); 248 if (thenSource == null) 249 throw new ArgumentNullException("thenSource"); 250 251 return s_impl.If<TResult>(condition, thenSource); 252 } 253 254 /// <summary> 255 /// If the specified <paramref name="condition"/> evaluates true, select the <paramref name="thenSource"/> sequence. Otherwise, return an empty sequence generated on the specified scheduler. 256 /// </summary> 257 /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam> 258 /// <param name="condition">Condition evaluated to decide which sequence to return.</param> 259 /// <param name="thenSource">Sequence returned in case <paramref name="condition"/> evaluates true.</param> 260 /// <param name="scheduler">Scheduler to generate an empty sequence on in case <paramref name="condition"/> evaluates false.</param> 261 /// <returns><paramref name="thenSource"/> if <paramref name="condition"/> evaluates true; an empty sequence otherwise.</returns> 262 /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="thenSource"/> or <paramref name="scheduler"/> is null.</exception> If(Func<bool> condition, IObservable<TResult> thenSource, IScheduler scheduler)263 public static IObservable<TResult> If<TResult>(Func<bool> condition, IObservable<TResult> thenSource, IScheduler scheduler) 264 { 265 if (condition == null) 266 throw new ArgumentNullException("condition"); 267 if (thenSource == null) 268 throw new ArgumentNullException("thenSource"); 269 if (scheduler == null) 270 throw new ArgumentNullException("scheduler"); 271 272 return s_impl.If<TResult>(condition, thenSource, scheduler); 273 } 274 275 #endregion 276 277 #region + While + 278 279 /// <summary> 280 /// Repeats the given <paramref name="source"/> as long as the specified <paramref name="condition"/> holds, where the <paramref name="condition"/> is evaluated before each repeated <paramref name="source"/> is subscribed to. 281 /// </summary> 282 /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> 283 /// <param name="source">Source to repeat as long as the <paramref name="condition"/> function evaluates to true.</param> 284 /// <param name="condition">Condition that will be evaluated before subscription to the <paramref name="source"/>, to determine whether repetition of the source is required.</param> 285 /// <returns>The observable sequence obtained by concatenating the <paramref name="source"/> sequence as long as the <paramref name="condition"/> holds.</returns> 286 /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="source"/> is null.</exception> While(Func<bool> condition, IObservable<TSource> source)287 public static IObservable<TSource> While<TSource>(Func<bool> condition, IObservable<TSource> source) 288 { 289 if (condition == null) 290 throw new ArgumentNullException("condition"); 291 if (source == null) 292 throw new ArgumentNullException("source"); 293 294 return s_impl.While<TSource>(condition, source); 295 } 296 297 #endregion 298 } 299 }