1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 using System.Collections.Generic;
4 using System.Reactive.Concurrency;
5 using System.Threading;
6 
7 #if !NO_TPL
8 using System.Threading.Tasks;
9 #endif
10 
11 namespace System.Reactive.Linq
12 {
13     public static partial class Observable
14     {
15         #region + ForEachAsync +
16 
17 #if !NO_TPL
18         /// <summary>
19         /// Invokes an action for each element in the observable sequence, and returns a Task object that will get signaled when the sequence terminates.
20         /// </summary>
21         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
22         /// <param name="source">Source sequence.</param>
23         /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
24         /// <returns>Task that signals the termination of the sequence.</returns>
25         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
26         /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
ForEachAsync(this IObservable<TSource> source, Action<TSource> onNext)27         public static Task ForEachAsync<TSource>(this IObservable<TSource> source, Action<TSource> onNext)
28         {
29             if (source == null)
30                 throw new ArgumentNullException("source");
31             if (onNext == null)
32                 throw new ArgumentNullException("onNext");
33 
34             return s_impl.ForEachAsync<TSource>(source, onNext);
35         }
36 
37         /// <summary>
38         /// Invokes an action for each element in the observable sequence, and returns a Task object that will get signaled when the sequence terminates.
39         /// The loop can be quit prematurely by setting the specified cancellation token.
40         /// </summary>
41         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
42         /// <param name="source">Source sequence.</param>
43         /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
44         /// <param name="cancellationToken">Cancellation token used to stop the loop.</param>
45         /// <returns>Task that signals the termination of the sequence.</returns>
46         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
47         /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
ForEachAsync(this IObservable<TSource> source, Action<TSource> onNext, CancellationToken cancellationToken)48         public static Task ForEachAsync<TSource>(this IObservable<TSource> source, Action<TSource> onNext, CancellationToken cancellationToken)
49         {
50             if (source == null)
51                 throw new ArgumentNullException("source");
52             if (onNext == null)
53                 throw new ArgumentNullException("onNext");
54 
55             return s_impl.ForEachAsync<TSource>(source, onNext, cancellationToken);
56         }
57 
58         /// <summary>
59         /// Invokes an action for each element in the observable sequence, incorporating the element's index, and returns a Task object that will get signaled when the sequence terminates.
60         /// </summary>
61         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
62         /// <param name="source">Source sequence.</param>
63         /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
64         /// <returns>Task that signals the termination of the sequence.</returns>
65         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
66         /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
ForEachAsync(this IObservable<TSource> source, Action<TSource, int> onNext)67         public static Task ForEachAsync<TSource>(this IObservable<TSource> source, Action<TSource, int> onNext)
68         {
69             if (source == null)
70                 throw new ArgumentNullException("source");
71             if (onNext == null)
72                 throw new ArgumentNullException("onNext");
73 
74             return s_impl.ForEachAsync<TSource>(source, onNext);
75         }
76 
77         /// <summary>
78         /// Invokes an action for each element in the observable sequence, incorporating the element's index, and returns a Task object that will get signaled when the sequence terminates.
79         /// The loop can be quit prematurely by setting the specified cancellation token.
80         /// </summary>
81         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
82         /// <param name="source">Source sequence.</param>
83         /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
84         /// <param name="cancellationToken">Cancellation token used to stop the loop.</param>
85         /// <returns>Task that signals the termination of the sequence.</returns>
86         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
87         /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
ForEachAsync(this IObservable<TSource> source, Action<TSource, int> onNext, CancellationToken cancellationToken)88         public static Task ForEachAsync<TSource>(this IObservable<TSource> source, Action<TSource, int> onNext, CancellationToken cancellationToken)
89         {
90             if (source == null)
91                 throw new ArgumentNullException("source");
92             if (onNext == null)
93                 throw new ArgumentNullException("onNext");
94 
95             return s_impl.ForEachAsync<TSource>(source, onNext, cancellationToken);
96         }
97 #endif
98 
99         #endregion
100 
101         #region + Case +
102 
103         /// <summary>
104         /// Uses <paramref name="selector"/> to determine which source in <paramref name="sources"/> to return, choosing <paramref name="defaultSource"/> if no match is found.
105         /// </summary>
106         /// <typeparam name="TValue">The type of the value returned by the selector function, used to look up the resulting source.</typeparam>
107         /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
108         /// <param name="selector">Selector function invoked to determine the source to lookup in the <paramref name="sources"/> dictionary.</param>
109         /// <param name="sources">Dictionary of sources to select from based on the <paramref name="selector"/> invocation result.</param>
110         /// <param name="defaultSource">Default source to select in case no matching source in <paramref name="sources"/> is found.</param>
111         /// <returns>The observable sequence retrieved from the <paramref name="sources"/> dictionary based on the <paramref name="selector"/> invocation result, or <paramref name="defaultSource"/> if no match is found.</returns>
112         /// <exception cref="ArgumentNullException"><paramref name="selector"/> or <paramref name="sources"/> or <paramref name="defaultSource"/> is null.</exception>
Case(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources, IObservable<TResult> defaultSource)113         public static IObservable<TResult> Case<TValue, TResult>(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources, IObservable<TResult> defaultSource)
114         {
115             if (selector == null)
116                 throw new ArgumentNullException("selector");
117             if (sources == null)
118                 throw new ArgumentNullException("sources");
119             if (defaultSource == null)
120                 throw new ArgumentNullException("defaultSource");
121 
122             return s_impl.Case<TValue, TResult>(selector, sources, defaultSource);
123         }
124 
125         /// <summary>
126         /// Uses <paramref name="selector"/> to determine which source in <paramref name="sources"/> to return, choosing an empty sequence on the specified scheduler if no match is found.
127         /// </summary>
128         /// <typeparam name="TValue">The type of the value returned by the selector function, used to look up the resulting source.</typeparam>
129         /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
130         /// <param name="selector">Selector function invoked to determine the source to lookup in the <paramref name="sources"/> dictionary.</param>
131         /// <param name="sources">Dictionary of sources to select from based on the <paramref name="selector"/> invocation result.</param>
132         /// <param name="scheduler">Scheduler to generate an empty sequence on in case no matching source in <paramref name="sources"/> is found.</param>
133         /// <returns>The observable sequence retrieved from the <paramref name="sources"/> dictionary based on the <paramref name="selector"/> invocation result, or an empty sequence if no match is found.</returns>
134         /// <exception cref="ArgumentNullException"><paramref name="selector"/> or <paramref name="sources"/> or <paramref name="scheduler"/> is null.</exception>
Case(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources, IScheduler scheduler)135         public static IObservable<TResult> Case<TValue, TResult>(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources, IScheduler scheduler)
136         {
137             if (selector == null)
138                 throw new ArgumentNullException("selector");
139             if (sources == null)
140                 throw new ArgumentNullException("sources");
141             if (scheduler == null)
142                 throw new ArgumentNullException("scheduler");
143 
144             return s_impl.Case<TValue, TResult>(selector, sources, scheduler);
145         }
146 
147         /// <summary>
148         /// Uses <paramref name="selector"/> to determine which source in <paramref name="sources"/> to return, choosing an empty sequence if no match is found.
149         /// </summary>
150         /// <typeparam name="TValue">The type of the value returned by the selector function, used to look up the resulting source.</typeparam>
151         /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
152         /// <param name="selector">Selector function invoked to determine the source to lookup in the <paramref name="sources"/> dictionary.</param>
153         /// <param name="sources">Dictionary of sources to select from based on the <paramref name="selector"/> invocation result.</param>
154         /// <returns>The observable sequence retrieved from the <paramref name="sources"/> dictionary based on the <paramref name="selector"/> invocation result, or an empty sequence if no match is found.</returns>
155         /// <exception cref="ArgumentNullException"><paramref name="selector"/> or <paramref name="sources"/> is null.</exception>
Case(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources)156         public static IObservable<TResult> Case<TValue, TResult>(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources)
157         {
158             if (selector == null)
159                 throw new ArgumentNullException("selector");
160             if (sources == null)
161                 throw new ArgumentNullException("sources");
162 
163             return s_impl.Case<TValue, TResult>(selector, sources);
164         }
165 
166         #endregion
167 
168         #region + DoWhile +
169 
170         /// <summary>
171         /// Repeats the given <paramref name="source"/> as long as the specified <paramref name="condition"/> holds, where the <paramref name="condition"/> is evaluated after each repeated <paramref name="source"/> completed.
172         /// </summary>
173         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
174         /// <param name="source">Source to repeat as long as the <paramref name="condition"/> function evaluates to true.</param>
175         /// <param name="condition">Condition that will be evaluated upon the completion of an iteration through the <paramref name="source"/>, to determine whether repetition of the source is required.</param>
176         /// <returns>The observable sequence obtained by concatenating the <paramref name="source"/> sequence as long as the <paramref name="condition"/> holds.</returns>
177         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="condition"/> is null.</exception>
DoWhile(this IObservable<TSource> source, Func<bool> condition)178         public static IObservable<TSource> DoWhile<TSource>(this IObservable<TSource> source, Func<bool> condition)
179         {
180             if (source == null)
181                 throw new ArgumentNullException("source");
182             if (condition == null)
183                 throw new ArgumentNullException("condition");
184 
185             return s_impl.DoWhile<TSource>(source, condition);
186         }
187 
188         #endregion
189 
190         #region + For +
191 
192         /// <summary>
193         /// Concatenates the observable sequences obtained by running the <paramref name="resultSelector"/> for each element in the given enumerable <paramref name="source"/>.
194         /// </summary>
195         /// <typeparam name="TSource">The type of the elements in the enumerable source sequence.</typeparam>
196         /// <typeparam name="TResult">The type of the elements in the observable result sequence.</typeparam>
197         /// <param name="source">Enumerable source for which each element will be mapped onto an observable source that will be concatenated in the result sequence.</param>
198         /// <param name="resultSelector">Function to select an observable source for each element in the <paramref name="source"/>.</param>
199         /// <returns>The observable sequence obtained by concatenating the sources returned by <paramref name="resultSelector"/> for each element in the <paramref name="source"/>.</returns>
200         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="resultSelector"/> is null.</exception>
For(IEnumerable<TSource> source, Func<TSource, IObservable<TResult>> resultSelector)201         public static IObservable<TResult> For<TSource, TResult>(IEnumerable<TSource> source, Func<TSource, IObservable<TResult>> resultSelector)
202         {
203             if (source == null)
204                 throw new ArgumentNullException("source");
205             if (resultSelector == null)
206                 throw new ArgumentNullException("resultSelector");
207 
208             return s_impl.For<TSource, TResult>(source, resultSelector);
209         }
210 
211         #endregion
212 
213         #region + If +
214 
215         /// <summary>
216         /// If the specified <paramref name="condition"/> evaluates true, select the <paramref name="thenSource"/> sequence. Otherwise, select the <paramref name="elseSource"/> sequence.
217         /// </summary>
218         /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
219         /// <param name="condition">Condition evaluated to decide which sequence to return.</param>
220         /// <param name="thenSource">Sequence returned in case <paramref name="condition"/> evaluates true.</param>
221         /// <param name="elseSource">Sequence returned in case <paramref name="condition"/> evaluates false.</param>
222         /// <returns><paramref name="thenSource"/> if <paramref name="condition"/> evaluates true; <paramref name="elseSource"/> otherwise.</returns>
223         /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="thenSource"/> or <paramref name="elseSource"/> is null.</exception>
If(Func<bool> condition, IObservable<TResult> thenSource, IObservable<TResult> elseSource)224         public static IObservable<TResult> If<TResult>(Func<bool> condition, IObservable<TResult> thenSource, IObservable<TResult> elseSource)
225         {
226             if (condition == null)
227                 throw new ArgumentNullException("condition");
228             if (thenSource == null)
229                 throw new ArgumentNullException("thenSource");
230             if (elseSource == null)
231                 throw new ArgumentNullException("elseSource");
232 
233             return s_impl.If<TResult>(condition, thenSource, elseSource);
234         }
235 
236         /// <summary>
237         /// If the specified <paramref name="condition"/> evaluates true, select the <paramref name="thenSource"/> sequence. Otherwise, return an empty sequence.
238         /// </summary>
239         /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
240         /// <param name="condition">Condition evaluated to decide which sequence to return.</param>
241         /// <param name="thenSource">Sequence returned in case <paramref name="condition"/> evaluates true.</param>
242         /// <returns><paramref name="thenSource"/> if <paramref name="condition"/> evaluates true; an empty sequence otherwise.</returns>
243         /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="thenSource"/> is null.</exception>
If(Func<bool> condition, IObservable<TResult> thenSource)244         public static IObservable<TResult> If<TResult>(Func<bool> condition, IObservable<TResult> thenSource)
245         {
246             if (condition == null)
247                 throw new ArgumentNullException("condition");
248             if (thenSource == null)
249                 throw new ArgumentNullException("thenSource");
250 
251             return s_impl.If<TResult>(condition, thenSource);
252         }
253 
254         /// <summary>
255         /// If the specified <paramref name="condition"/> evaluates true, select the <paramref name="thenSource"/> sequence. Otherwise, return an empty sequence generated on the specified scheduler.
256         /// </summary>
257         /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
258         /// <param name="condition">Condition evaluated to decide which sequence to return.</param>
259         /// <param name="thenSource">Sequence returned in case <paramref name="condition"/> evaluates true.</param>
260         /// <param name="scheduler">Scheduler to generate an empty sequence on in case <paramref name="condition"/> evaluates false.</param>
261         /// <returns><paramref name="thenSource"/> if <paramref name="condition"/> evaluates true; an empty sequence otherwise.</returns>
262         /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="thenSource"/> or <paramref name="scheduler"/> is null.</exception>
If(Func<bool> condition, IObservable<TResult> thenSource, IScheduler scheduler)263         public static IObservable<TResult> If<TResult>(Func<bool> condition, IObservable<TResult> thenSource, IScheduler scheduler)
264         {
265             if (condition == null)
266                 throw new ArgumentNullException("condition");
267             if (thenSource == null)
268                 throw new ArgumentNullException("thenSource");
269             if (scheduler == null)
270                 throw new ArgumentNullException("scheduler");
271 
272             return s_impl.If<TResult>(condition, thenSource, scheduler);
273         }
274 
275         #endregion
276 
277         #region + While +
278 
279         /// <summary>
280         /// Repeats the given <paramref name="source"/> as long as the specified <paramref name="condition"/> holds, where the <paramref name="condition"/> is evaluated before each repeated <paramref name="source"/> is subscribed to.
281         /// </summary>
282         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
283         /// <param name="source">Source to repeat as long as the <paramref name="condition"/> function evaluates to true.</param>
284         /// <param name="condition">Condition that will be evaluated before subscription to the <paramref name="source"/>, to determine whether repetition of the source is required.</param>
285         /// <returns>The observable sequence obtained by concatenating the <paramref name="source"/> sequence as long as the <paramref name="condition"/> holds.</returns>
286         /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="source"/> is null.</exception>
While(Func<bool> condition, IObservable<TSource> source)287         public static IObservable<TSource> While<TSource>(Func<bool> condition, IObservable<TSource> source)
288         {
289             if (condition == null)
290                 throw new ArgumentNullException("condition");
291             if (source == null)
292                 throw new ArgumentNullException("source");
293 
294             return s_impl.While<TSource>(condition, source);
295         }
296 
297         #endregion
298     }
299 }