1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #if !NO_PERF
4 using System.Collections.Generic;
5 using System.Reactive.Disposables;
6 
7 #if !NO_TPL
8 using System.Threading;
9 using System.Threading.Tasks;
10 #endif
11 
12 namespace System.Reactive.Linq.ObservableImpl
13 {
14     class SelectMany<TSource, TCollection, TResult> : Producer<TResult>
15     {
16         private readonly IObservable<TSource> _source;
17         private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
18         private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelectorI;
19         private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelectorE;
20         private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelectorEI;
21         private readonly Func<TSource, TCollection, TResult> _resultSelector;
22         private readonly Func<TSource, int, TCollection, int, TResult> _resultSelectorI;
23 
SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)24         public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
25         {
26             _source = source;
27             _collectionSelector = collectionSelector;
28             _resultSelector = resultSelector;
29         }
30 
SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)31         public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
32         {
33             _source = source;
34             _collectionSelectorI = collectionSelector;
35             _resultSelectorI = resultSelector;
36         }
37 
SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)38         public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
39         {
40             _source = source;
41             _collectionSelectorE = collectionSelector;
42             _resultSelector = resultSelector;
43         }
44 
SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)45         public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
46         {
47             _source = source;
48             _collectionSelectorEI = collectionSelector;
49             _resultSelectorI = resultSelector;
50         }
51 
52 #if !NO_TPL
53         private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelectorT;
54         private readonly Func<TSource, int, CancellationToken, Task<TCollection>> _collectionSelectorTI;
55         private readonly Func<TSource, int, TCollection, TResult> _resultSelectorTI;
56 
SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)57         public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
58         {
59             _source = source;
60             _collectionSelectorT = collectionSelector;
61             _resultSelector = resultSelector;
62         }
63 
SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, int, TCollection, TResult> resultSelector)64         public SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, int, TCollection, TResult> resultSelector)
65         {
66             _source = source;
67             _collectionSelectorTI = collectionSelector;
68             _resultSelectorTI = resultSelector;
69         }
70 #endif
71 
Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)72         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
73         {
74             if (_collectionSelector != null)
75             {
76                 var sink = new _(this, observer, cancel);
77                 setSink(sink);
78                 return sink.Run();
79             }
80             else if (_collectionSelectorI != null)
81             {
82                 var sink = new IndexSelectorImpl(this, observer, cancel);
83                 setSink(sink);
84                 return sink.Run();
85             }
86 #if !NO_TPL
87             else if (_collectionSelectorT != null)
88             {
89                 var sink = new SelectManyImpl(this, observer, cancel);
90                 setSink(sink);
91                 return sink.Run();
92             }
93             else if (_collectionSelectorTI != null)
94             {
95                 var sink = new Sigma(this, observer, cancel);
96                 setSink(sink);
97                 return sink.Run();
98             }
99 #endif
100             else if (_collectionSelectorE != null)
101             {
102                 var sink = new NoSelectorImpl(this, observer, cancel);
103                 setSink(sink);
104                 return _source.SubscribeSafe(sink);
105             }
106             else
107             {
108                 var sink = new Omega(this, observer, cancel);
109                 setSink(sink);
110                 return _source.SubscribeSafe(sink);
111             }
112         }
113 
114         class _ : Sink<TResult>, IObserver<TSource>
115         {
116             private readonly SelectMany<TSource, TCollection, TResult> _parent;
117 
_(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)118             public _(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
119                 : base(observer, cancel)
120             {
121                 _parent = parent;
122             }
123 
124             private object _gate;
125             private bool _isStopped;
126             private CompositeDisposable _group;
127             private SingleAssignmentDisposable _sourceSubscription;
128 
Run()129             public IDisposable Run()
130             {
131                 _gate = new object();
132                 _isStopped = false;
133                 _group = new CompositeDisposable();
134 
135                 _sourceSubscription = new SingleAssignmentDisposable();
136                 _group.Add(_sourceSubscription);
137                 _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
138 
139                 return _group;
140             }
141 
OnNext(TSource value)142             public void OnNext(TSource value)
143             {
144                 var collection = default(IObservable<TCollection>);
145 
146                 try
147                 {
148                     collection = _parent._collectionSelector(value);
149                 }
150                 catch (Exception ex)
151                 {
152                     lock (_gate)
153                     {
154                         base._observer.OnError(ex);
155                         base.Dispose();
156                     }
157                     return;
158                 }
159 
160                 var innerSubscription = new SingleAssignmentDisposable();
161                 _group.Add(innerSubscription);
162                 innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, innerSubscription));
163             }
164 
OnError(Exception error)165             public void OnError(Exception error)
166             {
167                 lock (_gate)
168                 {
169                     base._observer.OnError(error);
170                     base.Dispose();
171                 }
172             }
173 
OnCompleted()174             public void OnCompleted()
175             {
176                 _isStopped = true;
177                 if (_group.Count == 1)
178                 {
179                     //
180                     // Notice there can be a race between OnCompleted of the source and any
181                     // of the inner sequences, where both see _group.Count == 1, and one is
182                     // waiting for the lock. There won't be a double OnCompleted observation
183                     // though, because the call to Dispose silences the observer by swapping
184                     // in a NopObserver<T>.
185                     //
186                     lock (_gate)
187                     {
188                         base._observer.OnCompleted();
189                         base.Dispose();
190                     }
191                 }
192                 else
193                 {
194                     _sourceSubscription.Dispose();
195                 }
196             }
197 
198             class Iter : IObserver<TCollection>
199             {
200                 private readonly _ _parent;
201                 private readonly TSource _value;
202                 private readonly IDisposable _self;
203 
Iter(_ parent, TSource value, IDisposable self)204                 public Iter(_ parent, TSource value, IDisposable self)
205                 {
206                     _parent = parent;
207                     _value = value;
208                     _self = self;
209                 }
210 
OnNext(TCollection value)211                 public void OnNext(TCollection value)
212                 {
213                     var res = default(TResult);
214 
215                     try
216                     {
217                         res = _parent._parent._resultSelector(_value, value);
218                     }
219                     catch (Exception ex)
220                     {
221                         lock (_parent._gate)
222                         {
223                             _parent._observer.OnError(ex);
224                             _parent.Dispose();
225                         }
226                         return;
227                     }
228 
229                     lock (_parent._gate)
230                         _parent._observer.OnNext(res);
231                 }
232 
OnError(Exception error)233                 public void OnError(Exception error)
234                 {
235                     lock (_parent._gate)
236                     {
237                         _parent._observer.OnError(error);
238                         _parent.Dispose();
239                     }
240                 }
241 
OnCompleted()242                 public void OnCompleted()
243                 {
244                     _parent._group.Remove(_self);
245                     if (_parent._isStopped && _parent._group.Count == 1)
246                     {
247                         //
248                         // Notice there can be a race between OnCompleted of the source and any
249                         // of the inner sequences, where both see _group.Count == 1, and one is
250                         // waiting for the lock. There won't be a double OnCompleted observation
251                         // though, because the call to Dispose silences the observer by swapping
252                         // in a NopObserver<T>.
253                         //
254                         lock (_parent._gate)
255                         {
256                             _parent._observer.OnCompleted();
257                             _parent.Dispose();
258                         }
259                     }
260                 }
261             }
262         }
263 
264         class IndexSelectorImpl : Sink<TResult>, IObserver<TSource>
265         {
266             private readonly SelectMany<TSource, TCollection, TResult> _parent;
267 
IndexSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)268             public IndexSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
269                 : base(observer, cancel)
270             {
271                 _parent = parent;
272             }
273 
274             private object _gate;
275             private bool _isStopped;
276             private CompositeDisposable _group;
277             private SingleAssignmentDisposable _sourceSubscription;
278             private int _index;
279 
Run()280             public IDisposable Run()
281             {
282                 _gate = new object();
283                 _isStopped = false;
284                 _group = new CompositeDisposable();
285 
286                 _sourceSubscription = new SingleAssignmentDisposable();
287                 _group.Add(_sourceSubscription);
288                 _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
289 
290                 return _group;
291             }
292 
OnNext(TSource value)293             public void OnNext(TSource value)
294             {
295                 var index = checked(_index++);
296                 var collection = default(IObservable<TCollection>);
297 
298                 try
299                 {
300                     collection = _parent._collectionSelectorI(value, index);
301                 }
302                 catch (Exception ex)
303                 {
304                     lock (_gate)
305                     {
306                         base._observer.OnError(ex);
307                         base.Dispose();
308                     }
309                     return;
310                 }
311 
312                 var innerSubscription = new SingleAssignmentDisposable();
313                 _group.Add(innerSubscription);
314                 innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, index, innerSubscription));
315             }
316 
OnError(Exception error)317             public void OnError(Exception error)
318             {
319                 lock (_gate)
320                 {
321                     base._observer.OnError(error);
322                     base.Dispose();
323                 }
324             }
325 
OnCompleted()326             public void OnCompleted()
327             {
328                 _isStopped = true;
329                 if (_group.Count == 1)
330                 {
331                     //
332                     // Notice there can be a race between OnCompleted of the source and any
333                     // of the inner sequences, where both see _group.Count == 1, and one is
334                     // waiting for the lock. There won't be a double OnCompleted observation
335                     // though, because the call to Dispose silences the observer by swapping
336                     // in a NopObserver<T>.
337                     //
338                     lock (_gate)
339                     {
340                         base._observer.OnCompleted();
341                         base.Dispose();
342                     }
343                 }
344                 else
345                 {
346                     _sourceSubscription.Dispose();
347                 }
348             }
349 
350             class Iter : IObserver<TCollection>
351             {
352                 private readonly IndexSelectorImpl _parent;
353                 private readonly TSource _value;
354                 private readonly int _valueIndex;
355                 private readonly IDisposable _self;
356 
Iter(IndexSelectorImpl parent, TSource value, int index, IDisposable self)357                 public Iter(IndexSelectorImpl parent, TSource value, int index, IDisposable self)
358                 {
359                     _parent = parent;
360                     _value = value;
361                     _valueIndex = index;
362                     _self = self;
363                 }
364 
365                 private int _index;
366 
OnNext(TCollection value)367                 public void OnNext(TCollection value)
368                 {
369                     var res = default(TResult);
370 
371                     try
372                     {
373                         res = _parent._parent._resultSelectorI(_value, _valueIndex, value, checked(_index++));
374                     }
375                     catch (Exception ex)
376                     {
377                         lock (_parent._gate)
378                         {
379                             _parent._observer.OnError(ex);
380                             _parent.Dispose();
381                         }
382                         return;
383                     }
384 
385                     lock (_parent._gate)
386                         _parent._observer.OnNext(res);
387                 }
388 
OnError(Exception error)389                 public void OnError(Exception error)
390                 {
391                     lock (_parent._gate)
392                     {
393                         _parent._observer.OnError(error);
394                         _parent.Dispose();
395                     }
396                 }
397 
OnCompleted()398                 public void OnCompleted()
399                 {
400                     _parent._group.Remove(_self);
401                     if (_parent._isStopped && _parent._group.Count == 1)
402                     {
403                         //
404                         // Notice there can be a race between OnCompleted of the source and any
405                         // of the inner sequences, where both see _group.Count == 1, and one is
406                         // waiting for the lock. There won't be a double OnCompleted observation
407                         // though, because the call to Dispose silences the observer by swapping
408                         // in a NopObserver<T>.
409                         //
410                         lock (_parent._gate)
411                         {
412                             _parent._observer.OnCompleted();
413                             _parent.Dispose();
414                         }
415                     }
416                 }
417             }
418         }
419 
420         class NoSelectorImpl : Sink<TResult>, IObserver<TSource>
421         {
422             private readonly SelectMany<TSource, TCollection, TResult> _parent;
423 
NoSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)424             public NoSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
425                 : base(observer, cancel)
426             {
427                 _parent = parent;
428             }
429 
OnNext(TSource value)430             public void OnNext(TSource value)
431             {
432                 var xs = default(IEnumerable<TCollection>);
433                 try
434                 {
435                     xs = _parent._collectionSelectorE(value);
436                 }
437                 catch (Exception exception)
438                 {
439                     base._observer.OnError(exception);
440                     base.Dispose();
441                     return;
442                 }
443 
444                 var e = default(IEnumerator<TCollection>);
445                 try
446                 {
447                     e = xs.GetEnumerator();
448                 }
449                 catch (Exception exception)
450                 {
451                     base._observer.OnError(exception);
452                     base.Dispose();
453                     return;
454                 }
455 
456                 try
457                 {
458                     var hasNext = true;
459                     while (hasNext)
460                     {
461                         hasNext = false;
462                         var current = default(TResult);
463 
464                         try
465                         {
466                             hasNext = e.MoveNext();
467                             if (hasNext)
468                                 current = _parent._resultSelector(value, e.Current);
469                         }
470                         catch (Exception exception)
471                         {
472                             base._observer.OnError(exception);
473                             base.Dispose();
474                             return;
475                         }
476 
477                         if (hasNext)
478                             base._observer.OnNext(current);
479                     }
480                 }
481                 finally
482                 {
483                     if (e != null)
484                         e.Dispose();
485                 }
486             }
487 
OnError(Exception error)488             public void OnError(Exception error)
489             {
490                 base._observer.OnError(error);
491                 base.Dispose();
492             }
493 
OnCompleted()494             public void OnCompleted()
495             {
496                 base._observer.OnCompleted();
497                 base.Dispose();
498             }
499         }
500 
501         class Omega : Sink<TResult>, IObserver<TSource>
502         {
503             private readonly SelectMany<TSource, TCollection, TResult> _parent;
504 
Omega(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)505             public Omega(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
506                 : base(observer, cancel)
507             {
508                 _parent = parent;
509             }
510 
511             private int _index;
512 
OnNext(TSource value)513             public void OnNext(TSource value)
514             {
515                 var index = checked(_index++);
516 
517                 var xs = default(IEnumerable<TCollection>);
518                 try
519                 {
520                     xs = _parent._collectionSelectorEI(value, index);
521                 }
522                 catch (Exception exception)
523                 {
524                     base._observer.OnError(exception);
525                     base.Dispose();
526                     return;
527                 }
528 
529                 var e = default(IEnumerator<TCollection>);
530                 try
531                 {
532                     e = xs.GetEnumerator();
533                 }
534                 catch (Exception exception)
535                 {
536                     base._observer.OnError(exception);
537                     base.Dispose();
538                     return;
539                 }
540 
541                 try
542                 {
543                     var eIndex = 0;
544                     var hasNext = true;
545                     while (hasNext)
546                     {
547                         hasNext = false;
548                         var current = default(TResult);
549 
550                         try
551                         {
552                             hasNext = e.MoveNext();
553                             if (hasNext)
554                                 current = _parent._resultSelectorI(value, index, e.Current, checked(eIndex++));
555                         }
556                         catch (Exception exception)
557                         {
558                             base._observer.OnError(exception);
559                             base.Dispose();
560                             return;
561                         }
562 
563                         if (hasNext)
564                             base._observer.OnNext(current);
565                     }
566                 }
567                 finally
568                 {
569                     if (e != null)
570                         e.Dispose();
571                 }
572             }
573 
OnError(Exception error)574             public void OnError(Exception error)
575             {
576                 base._observer.OnError(error);
577                 base.Dispose();
578             }
579 
OnCompleted()580             public void OnCompleted()
581             {
582                 base._observer.OnCompleted();
583                 base.Dispose();
584             }
585         }
586 
587 #if !NO_TPL
588 #pragma warning disable 0420
589         class SelectManyImpl : Sink<TResult>, IObserver<TSource>
590         {
591             private readonly SelectMany<TSource, TCollection, TResult> _parent;
592 
SelectManyImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)593             public SelectManyImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
594                 : base(observer, cancel)
595             {
596                 _parent = parent;
597             }
598 
599             private object _gate;
600             private CancellationDisposable _cancel;
601             private volatile int _count;
602 
Run()603             public IDisposable Run()
604             {
605                 _gate = new object();
606                 _cancel = new CancellationDisposable();
607                 _count = 1;
608 
609                 return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel);
610             }
611 
OnNext(TSource value)612             public void OnNext(TSource value)
613             {
614                 var task = default(Task<TCollection>);
615                 try
616                 {
617                     Interlocked.Increment(ref _count);
618                     task = _parent._collectionSelectorT(value, _cancel.Token);
619                 }
620                 catch (Exception ex)
621                 {
622                     lock (_gate)
623                     {
624                         base._observer.OnError(ex);
625                         base.Dispose();
626                     }
627 
628                     return;
629                 }
630 
631                 if (task.IsCompleted)
632                 {
633                     OnCompletedTask(value, task);
634                 }
635                 else
636                 {
637                     AttachContinuation(value, task);
638                 }
639             }
640 
AttachContinuation(TSource value, Task<TCollection> task)641             private void AttachContinuation(TSource value, Task<TCollection> task)
642             {
643                 //
644                 // Separate method to avoid closure in synchronous completion case.
645                 //
646                 task.ContinueWith(t => OnCompletedTask(value, t));
647             }
648 
OnCompletedTask(TSource value, Task<TCollection> task)649             private void OnCompletedTask(TSource value, Task<TCollection> task)
650             {
651                 switch (task.Status)
652                 {
653                     case TaskStatus.RanToCompletion:
654                         {
655                             var res = default(TResult);
656                             try
657                             {
658                                 res = _parent._resultSelector(value, task.Result);
659                             }
660                             catch (Exception ex)
661                             {
662                                 lock (_gate)
663                                 {
664                                     base._observer.OnError(ex);
665                                     base.Dispose();
666                                 }
667 
668                                 return;
669                             }
670 
671                             lock (_gate)
672                                 base._observer.OnNext(res);
673 
674                             OnCompleted();
675                         }
676                         break;
677                     case TaskStatus.Faulted:
678                         {
679                             lock (_gate)
680                             {
681                                 base._observer.OnError(task.Exception.InnerException);
682                                 base.Dispose();
683                             }
684                         }
685                         break;
686                     case TaskStatus.Canceled:
687                         {
688                             if (!_cancel.IsDisposed)
689                             {
690                                 lock (_gate)
691                                 {
692                                     base._observer.OnError(new TaskCanceledException(task));
693                                     base.Dispose();
694                                 }
695                             }
696                         }
697                         break;
698                 }
699             }
700 
OnError(Exception error)701             public void OnError(Exception error)
702             {
703                 lock (_gate)
704                 {
705                     base._observer.OnError(error);
706                     base.Dispose();
707                 }
708             }
709 
OnCompleted()710             public void OnCompleted()
711             {
712                 if (Interlocked.Decrement(ref _count) == 0)
713                 {
714                     lock (_gate)
715                     {
716                         base._observer.OnCompleted();
717                         base.Dispose();
718                     }
719                 }
720             }
721         }
722 
723         class Sigma : Sink<TResult>, IObserver<TSource>
724         {
725             private readonly SelectMany<TSource, TCollection, TResult> _parent;
726 
Sigma(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)727             public Sigma(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
728                 : base(observer, cancel)
729             {
730                 _parent = parent;
731             }
732 
733             private object _gate;
734             private CancellationDisposable _cancel;
735             private volatile int _count;
736             private int _index;
737 
Run()738             public IDisposable Run()
739             {
740                 _gate = new object();
741                 _cancel = new CancellationDisposable();
742                 _count = 1;
743 
744                 return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel);
745             }
746 
OnNext(TSource value)747             public void OnNext(TSource value)
748             {
749                 var index = checked(_index++);
750 
751                 var task = default(Task<TCollection>);
752                 try
753                 {
754                     Interlocked.Increment(ref _count);
755                     task = _parent._collectionSelectorTI(value, index, _cancel.Token);
756                 }
757                 catch (Exception ex)
758                 {
759                     lock (_gate)
760                     {
761                         base._observer.OnError(ex);
762                         base.Dispose();
763                     }
764 
765                     return;
766                 }
767 
768                 if (task.IsCompleted)
769                 {
770                     OnCompletedTask(value, index, task);
771                 }
772                 else
773                 {
774                     AttachContinuation(value, index, task);
775                 }
776             }
777 
AttachContinuation(TSource value, int index, Task<TCollection> task)778             private void AttachContinuation(TSource value, int index, Task<TCollection> task)
779             {
780                 //
781                 // Separate method to avoid closure in synchronous completion case.
782                 //
783                 task.ContinueWith(t => OnCompletedTask(value, index, t));
784             }
785 
OnCompletedTask(TSource value, int index, Task<TCollection> task)786             private void OnCompletedTask(TSource value, int index, Task<TCollection> task)
787             {
788                 switch (task.Status)
789                 {
790                     case TaskStatus.RanToCompletion:
791                         {
792                             var res = default(TResult);
793                             try
794                             {
795                                 res = _parent._resultSelectorTI(value, index, task.Result);
796                             }
797                             catch (Exception ex)
798                             {
799                                 lock (_gate)
800                                 {
801                                     base._observer.OnError(ex);
802                                     base.Dispose();
803                                 }
804 
805                                 return;
806                             }
807 
808                             lock (_gate)
809                                 base._observer.OnNext(res);
810 
811                             OnCompleted();
812                         }
813                         break;
814                     case TaskStatus.Faulted:
815                         {
816                             lock (_gate)
817                             {
818                                 base._observer.OnError(task.Exception.InnerException);
819                                 base.Dispose();
820                             }
821                         }
822                         break;
823                     case TaskStatus.Canceled:
824                         {
825                             if (!_cancel.IsDisposed)
826                             {
827                                 lock (_gate)
828                                 {
829                                     base._observer.OnError(new TaskCanceledException(task));
830                                     base.Dispose();
831                                 }
832                             }
833                         }
834                         break;
835                 }
836             }
837 
OnError(Exception error)838             public void OnError(Exception error)
839             {
840                 lock (_gate)
841                 {
842                     base._observer.OnError(error);
843                     base.Dispose();
844                 }
845             }
846 
OnCompleted()847             public void OnCompleted()
848             {
849                 if (Interlocked.Decrement(ref _count) == 0)
850                 {
851                     lock (_gate)
852                     {
853                         base._observer.OnCompleted();
854                         base.Dispose();
855                     }
856                 }
857             }
858         }
859 #pragma warning restore 0420
860 #endif
861     }
862 
863     class SelectMany<TSource, TResult> : Producer<TResult>
864     {
865         private readonly IObservable<TSource> _source;
866         private readonly Func<TSource, IObservable<TResult>> _selector;
867         private readonly Func<TSource, int, IObservable<TResult>> _selectorI;
868         private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
869         private readonly Func<IObservable<TResult>> _selectorOnCompleted;
870         private readonly Func<TSource, IEnumerable<TResult>> _selectorE;
871         private readonly Func<TSource, int, IEnumerable<TResult>> _selectorEI;
872 
SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)873         public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
874         {
875             _source = source;
876             _selector = selector;
877         }
878 
SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)879         public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
880         {
881             _source = source;
882             _selectorI = selector;
883         }
884 
SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)885         public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
886         {
887             _source = source;
888             _selector = selector;
889             _selectorOnError = selectorOnError;
890             _selectorOnCompleted = selectorOnCompleted;
891         }
892 
SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)893         public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)
894         {
895             _source = source;
896             _selectorI = selector;
897             _selectorOnError = selectorOnError;
898             _selectorOnCompleted = selectorOnCompleted;
899         }
900 
SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)901         public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)
902         {
903             _source = source;
904             _selectorE = selector;
905         }
906 
SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)907         public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)
908         {
909             _source = source;
910             _selectorEI = selector;
911         }
912 
913 #if !NO_TPL
914         private readonly Func<TSource, CancellationToken, Task<TResult>> _selectorT;
915         private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selectorTI;
916 
SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)917         public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
918         {
919             _source = source;
920             _selectorT = selector;
921         }
922 
SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)923         public SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
924         {
925             _source = source;
926             _selectorTI = selector;
927         }
928 #endif
929 
Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)930         protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
931         {
932             if (_selector != null)
933             {
934                 var sink = new _(this, observer, cancel);
935                 setSink(sink);
936                 return sink.Run();
937             }
938             else if (_selectorI != null)
939             {
940                 var sink = new IndexSelectorImpl(this, observer, cancel);
941                 setSink(sink);
942                 return sink.Run();
943             }
944 #if !NO_TPL
945             else if (_selectorT != null)
946             {
947                 var sink = new SelectManyImpl(this, observer, cancel);
948                 setSink(sink);
949                 return sink.Run();
950             }
951             else if (_selectorTI != null)
952             {
953                 var sink = new Sigma(this, observer, cancel);
954                 setSink(sink);
955                 return sink.Run();
956             }
957 #endif
958             else if (_selectorE != null)
959             {
960                 var sink = new NoSelectorImpl(this, observer, cancel);
961                 setSink(sink);
962                 return _source.SubscribeSafe(sink);
963             }
964             else
965             {
966                 var sink = new Omega(this, observer, cancel);
967                 setSink(sink);
968                 return _source.SubscribeSafe(sink);
969             }
970         }
971 
972         class _ : Sink<TResult>, IObserver<TSource>
973         {
974             private readonly SelectMany<TSource, TResult> _parent;
975 
_(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)976             public _(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
977                 : base(observer, cancel)
978             {
979                 _parent = parent;
980             }
981 
982             private object _gate;
983             private bool _isStopped;
984             private CompositeDisposable _group;
985             private SingleAssignmentDisposable _sourceSubscription;
986 
Run()987             public IDisposable Run()
988             {
989                 _gate = new object();
990                 _isStopped = false;
991                 _group = new CompositeDisposable();
992 
993                 _sourceSubscription = new SingleAssignmentDisposable();
994                 _group.Add(_sourceSubscription);
995                 _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
996 
997                 return _group;
998             }
999 
OnNext(TSource value)1000             public void OnNext(TSource value)
1001             {
1002                 var inner = default(IObservable<TResult>);
1003 
1004                 try
1005                 {
1006                     inner = _parent._selector(value);
1007                 }
1008                 catch (Exception ex)
1009                 {
1010                     lock (_gate)
1011                     {
1012                         base._observer.OnError(ex);
1013                         base.Dispose();
1014                     }
1015                     return;
1016                 }
1017 
1018                 SubscribeInner(inner);
1019             }
1020 
OnError(Exception error)1021             public void OnError(Exception error)
1022             {
1023                 if (_parent._selectorOnError != null)
1024                 {
1025                     var inner = default(IObservable<TResult>);
1026 
1027                     try
1028                     {
1029                         inner = _parent._selectorOnError(error);
1030                     }
1031                     catch (Exception ex)
1032                     {
1033                         lock (_gate)
1034                         {
1035                             base._observer.OnError(ex);
1036                             base.Dispose();
1037                         }
1038                         return;
1039                     }
1040 
1041                     SubscribeInner(inner);
1042 
1043                     Final();
1044                 }
1045                 else
1046                 {
1047                     lock (_gate)
1048                     {
1049                         base._observer.OnError(error);
1050                         base.Dispose();
1051                     }
1052                 }
1053             }
1054 
OnCompleted()1055             public void OnCompleted()
1056             {
1057                 if (_parent._selectorOnCompleted != null)
1058                 {
1059                     var inner = default(IObservable<TResult>);
1060 
1061                     try
1062                     {
1063                         inner = _parent._selectorOnCompleted();
1064                     }
1065                     catch (Exception ex)
1066                     {
1067                         lock (_gate)
1068                         {
1069                             base._observer.OnError(ex);
1070                             base.Dispose();
1071                         }
1072                         return;
1073                     }
1074 
1075                     SubscribeInner(inner);
1076                 }
1077 
1078                 Final();
1079             }
1080 
Final()1081             private void Final()
1082             {
1083                 _isStopped = true;
1084                 if (_group.Count == 1)
1085                 {
1086                     //
1087                     // Notice there can be a race between OnCompleted of the source and any
1088                     // of the inner sequences, where both see _group.Count == 1, and one is
1089                     // waiting for the lock. There won't be a double OnCompleted observation
1090                     // though, because the call to Dispose silences the observer by swapping
1091                     // in a NopObserver<T>.
1092                     //
1093                     lock (_gate)
1094                     {
1095                         base._observer.OnCompleted();
1096                         base.Dispose();
1097                     }
1098                 }
1099                 else
1100                 {
1101                     _sourceSubscription.Dispose();
1102                 }
1103             }
1104 
SubscribeInner(IObservable<TResult> inner)1105             private void SubscribeInner(IObservable<TResult> inner)
1106             {
1107                 var innerSubscription = new SingleAssignmentDisposable();
1108                 _group.Add(innerSubscription);
1109                 innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription));
1110             }
1111 
1112             class Iter : IObserver<TResult>
1113             {
1114                 private readonly _ _parent;
1115                 private readonly IDisposable _self;
1116 
Iter(_ parent, IDisposable self)1117                 public Iter(_ parent, IDisposable self)
1118                 {
1119                     _parent = parent;
1120                     _self = self;
1121                 }
1122 
OnNext(TResult value)1123                 public void OnNext(TResult value)
1124                 {
1125                     lock (_parent._gate)
1126                         _parent._observer.OnNext(value);
1127                 }
1128 
OnError(Exception error)1129                 public void OnError(Exception error)
1130                 {
1131                     lock (_parent._gate)
1132                     {
1133                         _parent._observer.OnError(error);
1134                         _parent.Dispose();
1135                     }
1136                 }
1137 
OnCompleted()1138                 public void OnCompleted()
1139                 {
1140                     _parent._group.Remove(_self);
1141                     if (_parent._isStopped && _parent._group.Count == 1)
1142                     {
1143                         //
1144                         // Notice there can be a race between OnCompleted of the source and any
1145                         // of the inner sequences, where both see _group.Count == 1, and one is
1146                         // waiting for the lock. There won't be a double OnCompleted observation
1147                         // though, because the call to Dispose silences the observer by swapping
1148                         // in a NopObserver<T>.
1149                         //
1150                         lock (_parent._gate)
1151                         {
1152                             _parent._observer.OnCompleted();
1153                             _parent.Dispose();
1154                         }
1155                     }
1156                 }
1157             }
1158         }
1159 
1160         class IndexSelectorImpl : Sink<TResult>, IObserver<TSource>
1161         {
1162             private readonly SelectMany<TSource, TResult> _parent;
1163 
IndexSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)1164             public IndexSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
1165                 : base(observer, cancel)
1166             {
1167                 _parent = parent;
1168             }
1169 
1170             private object _gate;
1171             private bool _isStopped;
1172             private CompositeDisposable _group;
1173             private SingleAssignmentDisposable _sourceSubscription;
1174             private int _index;
1175 
Run()1176             public IDisposable Run()
1177             {
1178                 _gate = new object();
1179                 _isStopped = false;
1180                 _group = new CompositeDisposable();
1181 
1182                 _sourceSubscription = new SingleAssignmentDisposable();
1183                 _group.Add(_sourceSubscription);
1184                 _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this);
1185 
1186                 return _group;
1187             }
1188 
OnNext(TSource value)1189             public void OnNext(TSource value)
1190             {
1191                 var inner = default(IObservable<TResult>);
1192 
1193                 try
1194                 {
1195                     inner = _parent._selectorI(value, checked(_index++));
1196                 }
1197                 catch (Exception ex)
1198                 {
1199                     lock (_gate)
1200                     {
1201                         base._observer.OnError(ex);
1202                         base.Dispose();
1203                     }
1204                     return;
1205                 }
1206 
1207                 SubscribeInner(inner);
1208             }
1209 
OnError(Exception error)1210             public void OnError(Exception error)
1211             {
1212                 if (_parent._selectorOnError != null)
1213                 {
1214                     var inner = default(IObservable<TResult>);
1215 
1216                     try
1217                     {
1218                         inner = _parent._selectorOnError(error);
1219                     }
1220                     catch (Exception ex)
1221                     {
1222                         lock (_gate)
1223                         {
1224                             base._observer.OnError(ex);
1225                             base.Dispose();
1226                         }
1227                         return;
1228                     }
1229 
1230                     SubscribeInner(inner);
1231 
1232                     Final();
1233                 }
1234                 else
1235                 {
1236                     lock (_gate)
1237                     {
1238                         base._observer.OnError(error);
1239                         base.Dispose();
1240                     }
1241                 }
1242             }
1243 
OnCompleted()1244             public void OnCompleted()
1245             {
1246                 if (_parent._selectorOnCompleted != null)
1247                 {
1248                     var inner = default(IObservable<TResult>);
1249 
1250                     try
1251                     {
1252                         inner = _parent._selectorOnCompleted();
1253                     }
1254                     catch (Exception ex)
1255                     {
1256                         lock (_gate)
1257                         {
1258                             base._observer.OnError(ex);
1259                             base.Dispose();
1260                         }
1261                         return;
1262                     }
1263 
1264                     SubscribeInner(inner);
1265                 }
1266 
1267                 Final();
1268             }
1269 
Final()1270             private void Final()
1271             {
1272                 _isStopped = true;
1273                 if (_group.Count == 1)
1274                 {
1275                     //
1276                     // Notice there can be a race between OnCompleted of the source and any
1277                     // of the inner sequences, where both see _group.Count == 1, and one is
1278                     // waiting for the lock. There won't be a double OnCompleted observation
1279                     // though, because the call to Dispose silences the observer by swapping
1280                     // in a NopObserver<T>.
1281                     //
1282                     lock (_gate)
1283                     {
1284                         base._observer.OnCompleted();
1285                         base.Dispose();
1286                     }
1287                 }
1288                 else
1289                 {
1290                     _sourceSubscription.Dispose();
1291                 }
1292             }
1293 
SubscribeInner(IObservable<TResult> inner)1294             private void SubscribeInner(IObservable<TResult> inner)
1295             {
1296                 var innerSubscription = new SingleAssignmentDisposable();
1297                 _group.Add(innerSubscription);
1298                 innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription));
1299             }
1300 
1301             class Iter : IObserver<TResult>
1302             {
1303                 private readonly IndexSelectorImpl _parent;
1304                 private readonly IDisposable _self;
1305 
Iter(IndexSelectorImpl parent, IDisposable self)1306                 public Iter(IndexSelectorImpl parent, IDisposable self)
1307                 {
1308                     _parent = parent;
1309                     _self = self;
1310                 }
1311 
OnNext(TResult value)1312                 public void OnNext(TResult value)
1313                 {
1314                     lock (_parent._gate)
1315                         _parent._observer.OnNext(value);
1316                 }
1317 
OnError(Exception error)1318                 public void OnError(Exception error)
1319                 {
1320                     lock (_parent._gate)
1321                     {
1322                         _parent._observer.OnError(error);
1323                         _parent.Dispose();
1324                     }
1325                 }
1326 
OnCompleted()1327                 public void OnCompleted()
1328                 {
1329                     _parent._group.Remove(_self);
1330                     if (_parent._isStopped && _parent._group.Count == 1)
1331                     {
1332                         //
1333                         // Notice there can be a race between OnCompleted of the source and any
1334                         // of the inner sequences, where both see _group.Count == 1, and one is
1335                         // waiting for the lock. There won't be a double OnCompleted observation
1336                         // though, because the call to Dispose silences the observer by swapping
1337                         // in a NopObserver<T>.
1338                         //
1339                         lock (_parent._gate)
1340                         {
1341                             _parent._observer.OnCompleted();
1342                             _parent.Dispose();
1343                         }
1344                     }
1345                 }
1346             }
1347         }
1348 
1349         class NoSelectorImpl : Sink<TResult>, IObserver<TSource>
1350         {
1351             private readonly SelectMany<TSource, TResult> _parent;
1352 
NoSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)1353             public NoSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
1354                 : base(observer, cancel)
1355             {
1356                 _parent = parent;
1357             }
1358 
OnNext(TSource value)1359             public void OnNext(TSource value)
1360             {
1361                 var xs = default(IEnumerable<TResult>);
1362                 try
1363                 {
1364                     xs = _parent._selectorE(value);
1365                 }
1366                 catch (Exception exception)
1367                 {
1368                     base._observer.OnError(exception);
1369                     base.Dispose();
1370                     return;
1371                 }
1372 
1373                 var e = default(IEnumerator<TResult>);
1374                 try
1375                 {
1376                     e = xs.GetEnumerator();
1377                 }
1378                 catch (Exception exception)
1379                 {
1380                     base._observer.OnError(exception);
1381                     base.Dispose();
1382                     return;
1383                 }
1384 
1385                 try
1386                 {
1387                     var hasNext = true;
1388                     while (hasNext)
1389                     {
1390                         hasNext = false;
1391                         var current = default(TResult);
1392 
1393                         try
1394                         {
1395                             hasNext = e.MoveNext();
1396                             if (hasNext)
1397                                 current = e.Current;
1398                         }
1399                         catch (Exception exception)
1400                         {
1401                             base._observer.OnError(exception);
1402                             base.Dispose();
1403                             return;
1404                         }
1405 
1406                         if (hasNext)
1407                             base._observer.OnNext(current);
1408                     }
1409                 }
1410                 finally
1411                 {
1412                     if (e != null)
1413                         e.Dispose();
1414                 }
1415             }
1416 
OnError(Exception error)1417             public void OnError(Exception error)
1418             {
1419                 base._observer.OnError(error);
1420                 base.Dispose();
1421             }
1422 
OnCompleted()1423             public void OnCompleted()
1424             {
1425                 base._observer.OnCompleted();
1426                 base.Dispose();
1427             }
1428         }
1429 
1430         class Omega : Sink<TResult>, IObserver<TSource>
1431         {
1432             private readonly SelectMany<TSource, TResult> _parent;
1433 
Omega(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)1434             public Omega(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
1435                 : base(observer, cancel)
1436             {
1437                 _parent = parent;
1438             }
1439 
1440             private int _index;
1441 
OnNext(TSource value)1442             public void OnNext(TSource value)
1443             {
1444                 var xs = default(IEnumerable<TResult>);
1445                 try
1446                 {
1447                     xs = _parent._selectorEI(value, checked(_index++));
1448                 }
1449                 catch (Exception exception)
1450                 {
1451                     base._observer.OnError(exception);
1452                     base.Dispose();
1453                     return;
1454                 }
1455 
1456                 var e = default(IEnumerator<TResult>);
1457                 try
1458                 {
1459                     e = xs.GetEnumerator();
1460                 }
1461                 catch (Exception exception)
1462                 {
1463                     base._observer.OnError(exception);
1464                     base.Dispose();
1465                     return;
1466                 }
1467 
1468                 try
1469                 {
1470                     var hasNext = true;
1471                     while (hasNext)
1472                     {
1473                         hasNext = false;
1474                         var current = default(TResult);
1475 
1476                         try
1477                         {
1478                             hasNext = e.MoveNext();
1479                             if (hasNext)
1480                                 current = e.Current;
1481                         }
1482                         catch (Exception exception)
1483                         {
1484                             base._observer.OnError(exception);
1485                             base.Dispose();
1486                             return;
1487                         }
1488 
1489                         if (hasNext)
1490                             base._observer.OnNext(current);
1491                     }
1492                 }
1493                 finally
1494                 {
1495                     if (e != null)
1496                         e.Dispose();
1497                 }
1498             }
1499 
OnError(Exception error)1500             public void OnError(Exception error)
1501             {
1502                 base._observer.OnError(error);
1503                 base.Dispose();
1504             }
1505 
OnCompleted()1506             public void OnCompleted()
1507             {
1508                 base._observer.OnCompleted();
1509                 base.Dispose();
1510             }
1511         }
1512 
1513 #if !NO_TPL
1514 #pragma warning disable 0420
1515         class SelectManyImpl : Sink<TResult>, IObserver<TSource>
1516         {
1517             private readonly SelectMany<TSource, TResult> _parent;
1518 
SelectManyImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)1519             public SelectManyImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
1520                 : base(observer, cancel)
1521             {
1522                 _parent = parent;
1523             }
1524 
1525             private object _gate;
1526             private CancellationDisposable _cancel;
1527             private volatile int _count;
1528 
Run()1529             public IDisposable Run()
1530             {
1531                 _gate = new object();
1532                 _cancel = new CancellationDisposable();
1533                 _count = 1;
1534 
1535                 return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel);
1536             }
1537 
OnNext(TSource value)1538             public void OnNext(TSource value)
1539             {
1540                 var task = default(Task<TResult>);
1541                 try
1542                 {
1543                     Interlocked.Increment(ref _count);
1544                     task = _parent._selectorT(value, _cancel.Token);
1545                 }
1546                 catch (Exception ex)
1547                 {
1548                     lock (_gate)
1549                     {
1550                         base._observer.OnError(ex);
1551                         base.Dispose();
1552                     }
1553 
1554                     return;
1555                 }
1556 
1557                 if (task.IsCompleted)
1558                 {
1559                     OnCompletedTask(task);
1560                 }
1561                 else
1562                 {
1563                     task.ContinueWith(OnCompletedTask);
1564                 }
1565             }
1566 
OnCompletedTask(Task<TResult> task)1567             private void OnCompletedTask(Task<TResult> task)
1568             {
1569                 switch (task.Status)
1570                 {
1571                     case TaskStatus.RanToCompletion:
1572                         {
1573                             lock (_gate)
1574                                 base._observer.OnNext(task.Result);
1575 
1576                             OnCompleted();
1577                         }
1578                         break;
1579                     case TaskStatus.Faulted:
1580                         {
1581                             lock (_gate)
1582                             {
1583                                 base._observer.OnError(task.Exception.InnerException);
1584                                 base.Dispose();
1585                             }
1586                         }
1587                         break;
1588                     case TaskStatus.Canceled:
1589                         {
1590                             if (!_cancel.IsDisposed)
1591                             {
1592                                 lock (_gate)
1593                                 {
1594                                     base._observer.OnError(new TaskCanceledException(task));
1595                                     base.Dispose();
1596                                }
1597                             }
1598                         }
1599                         break;
1600                 }
1601             }
1602 
OnError(Exception error)1603             public void OnError(Exception error)
1604             {
1605                 lock (_gate)
1606                 {
1607                     base._observer.OnError(error);
1608                     base.Dispose();
1609                 }
1610             }
1611 
OnCompleted()1612             public void OnCompleted()
1613             {
1614                 if (Interlocked.Decrement(ref _count) == 0)
1615                 {
1616                     lock (_gate)
1617                     {
1618                         base._observer.OnCompleted();
1619                         base.Dispose();
1620                     }
1621                 }
1622             }
1623         }
1624 
1625         class Sigma : Sink<TResult>, IObserver<TSource>
1626         {
1627             private readonly SelectMany<TSource, TResult> _parent;
1628 
Sigma(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)1629             public Sigma(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
1630                 : base(observer, cancel)
1631             {
1632                 _parent = parent;
1633             }
1634 
1635             private object _gate;
1636             private CancellationDisposable _cancel;
1637             private volatile int _count;
1638             private int _index;
1639 
Run()1640             public IDisposable Run()
1641             {
1642                 _gate = new object();
1643                 _cancel = new CancellationDisposable();
1644                 _count = 1;
1645 
1646                 return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel);
1647             }
1648 
OnNext(TSource value)1649             public void OnNext(TSource value)
1650             {
1651                 var task = default(Task<TResult>);
1652                 try
1653                 {
1654                     Interlocked.Increment(ref _count);
1655                     task = _parent._selectorTI(value, checked(_index++), _cancel.Token);
1656                 }
1657                 catch (Exception ex)
1658                 {
1659                     lock (_gate)
1660                     {
1661                         base._observer.OnError(ex);
1662                         base.Dispose();
1663                     }
1664 
1665                     return;
1666                 }
1667 
1668                 if (task.IsCompleted)
1669                 {
1670                     OnCompletedTask(task);
1671                 }
1672                 else
1673                 {
1674                     task.ContinueWith(OnCompletedTask);
1675                 }
1676             }
1677 
OnCompletedTask(Task<TResult> task)1678             private void OnCompletedTask(Task<TResult> task)
1679             {
1680                 switch (task.Status)
1681                 {
1682                     case TaskStatus.RanToCompletion:
1683                         {
1684                             lock (_gate)
1685                                 base._observer.OnNext(task.Result);
1686 
1687                             OnCompleted();
1688                         }
1689                         break;
1690                     case TaskStatus.Faulted:
1691                         {
1692                             lock (_gate)
1693                             {
1694                                 base._observer.OnError(task.Exception.InnerException);
1695                                 base.Dispose();
1696                             }
1697                         }
1698                         break;
1699                     case TaskStatus.Canceled:
1700                         {
1701                             if (!_cancel.IsDisposed)
1702                             {
1703                                 lock (_gate)
1704                                 {
1705                                     base._observer.OnError(new TaskCanceledException(task));
1706                                     base.Dispose();
1707                                 }
1708                             }
1709                         }
1710                         break;
1711                 }
1712             }
1713 
OnError(Exception error)1714             public void OnError(Exception error)
1715             {
1716                 lock (_gate)
1717                 {
1718                     base._observer.OnError(error);
1719                     base.Dispose();
1720                 }
1721             }
1722 
OnCompleted()1723             public void OnCompleted()
1724             {
1725                 if (Interlocked.Decrement(ref _count) == 0)
1726                 {
1727                     lock (_gate)
1728                     {
1729                         base._observer.OnCompleted();
1730                         base.Dispose();
1731                     }
1732                 }
1733             }
1734         }
1735 #pragma warning restore 0420
1736 #endif
1737     }
1738 }
1739 #endif