1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #if !NO_PERF 4 using System.Collections.Generic; 5 using System.Reactive.Disposables; 6 7 #if !NO_TPL 8 using System.Threading; 9 using System.Threading.Tasks; 10 #endif 11 12 namespace System.Reactive.Linq.ObservableImpl 13 { 14 class SelectMany<TSource, TCollection, TResult> : Producer<TResult> 15 { 16 private readonly IObservable<TSource> _source; 17 private readonly Func<TSource, IObservable<TCollection>> _collectionSelector; 18 private readonly Func<TSource, int, IObservable<TCollection>> _collectionSelectorI; 19 private readonly Func<TSource, IEnumerable<TCollection>> _collectionSelectorE; 20 private readonly Func<TSource, int, IEnumerable<TCollection>> _collectionSelectorEI; 21 private readonly Func<TSource, TCollection, TResult> _resultSelector; 22 private readonly Func<TSource, int, TCollection, int, TResult> _resultSelectorI; 23 SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)24 public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector) 25 { 26 _source = source; 27 _collectionSelector = collectionSelector; 28 _resultSelector = resultSelector; 29 } 30 SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)31 public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector) 32 { 33 _source = source; 34 _collectionSelectorI = collectionSelector; 35 _resultSelectorI = resultSelector; 36 } 37 SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)38 public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector) 39 { 40 _source = source; 41 _collectionSelectorE = collectionSelector; 42 _resultSelector = resultSelector; 43 } 44 SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)45 public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector) 46 { 47 _source = source; 48 _collectionSelectorEI = collectionSelector; 49 _resultSelectorI = resultSelector; 50 } 51 52 #if !NO_TPL 53 private readonly Func<TSource, CancellationToken, Task<TCollection>> _collectionSelectorT; 54 private readonly Func<TSource, int, CancellationToken, Task<TCollection>> _collectionSelectorTI; 55 private readonly Func<TSource, int, TCollection, TResult> _resultSelectorTI; 56 SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)57 public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector) 58 { 59 _source = source; 60 _collectionSelectorT = collectionSelector; 61 _resultSelector = resultSelector; 62 } 63 SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, int, TCollection, TResult> resultSelector)64 public SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TCollection>> collectionSelector, Func<TSource, int, TCollection, TResult> resultSelector) 65 { 66 _source = source; 67 _collectionSelectorTI = collectionSelector; 68 _resultSelectorTI = resultSelector; 69 } 70 #endif 71 Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)72 protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink) 73 { 74 if (_collectionSelector != null) 75 { 76 var sink = new _(this, observer, cancel); 77 setSink(sink); 78 return sink.Run(); 79 } 80 else if (_collectionSelectorI != null) 81 { 82 var sink = new IndexSelectorImpl(this, observer, cancel); 83 setSink(sink); 84 return sink.Run(); 85 } 86 #if !NO_TPL 87 else if (_collectionSelectorT != null) 88 { 89 var sink = new SelectManyImpl(this, observer, cancel); 90 setSink(sink); 91 return sink.Run(); 92 } 93 else if (_collectionSelectorTI != null) 94 { 95 var sink = new Sigma(this, observer, cancel); 96 setSink(sink); 97 return sink.Run(); 98 } 99 #endif 100 else if (_collectionSelectorE != null) 101 { 102 var sink = new NoSelectorImpl(this, observer, cancel); 103 setSink(sink); 104 return _source.SubscribeSafe(sink); 105 } 106 else 107 { 108 var sink = new Omega(this, observer, cancel); 109 setSink(sink); 110 return _source.SubscribeSafe(sink); 111 } 112 } 113 114 class _ : Sink<TResult>, IObserver<TSource> 115 { 116 private readonly SelectMany<TSource, TCollection, TResult> _parent; 117 _(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)118 public _(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) 119 : base(observer, cancel) 120 { 121 _parent = parent; 122 } 123 124 private object _gate; 125 private bool _isStopped; 126 private CompositeDisposable _group; 127 private SingleAssignmentDisposable _sourceSubscription; 128 Run()129 public IDisposable Run() 130 { 131 _gate = new object(); 132 _isStopped = false; 133 _group = new CompositeDisposable(); 134 135 _sourceSubscription = new SingleAssignmentDisposable(); 136 _group.Add(_sourceSubscription); 137 _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this); 138 139 return _group; 140 } 141 OnNext(TSource value)142 public void OnNext(TSource value) 143 { 144 var collection = default(IObservable<TCollection>); 145 146 try 147 { 148 collection = _parent._collectionSelector(value); 149 } 150 catch (Exception ex) 151 { 152 lock (_gate) 153 { 154 base._observer.OnError(ex); 155 base.Dispose(); 156 } 157 return; 158 } 159 160 var innerSubscription = new SingleAssignmentDisposable(); 161 _group.Add(innerSubscription); 162 innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, innerSubscription)); 163 } 164 OnError(Exception error)165 public void OnError(Exception error) 166 { 167 lock (_gate) 168 { 169 base._observer.OnError(error); 170 base.Dispose(); 171 } 172 } 173 OnCompleted()174 public void OnCompleted() 175 { 176 _isStopped = true; 177 if (_group.Count == 1) 178 { 179 // 180 // Notice there can be a race between OnCompleted of the source and any 181 // of the inner sequences, where both see _group.Count == 1, and one is 182 // waiting for the lock. There won't be a double OnCompleted observation 183 // though, because the call to Dispose silences the observer by swapping 184 // in a NopObserver<T>. 185 // 186 lock (_gate) 187 { 188 base._observer.OnCompleted(); 189 base.Dispose(); 190 } 191 } 192 else 193 { 194 _sourceSubscription.Dispose(); 195 } 196 } 197 198 class Iter : IObserver<TCollection> 199 { 200 private readonly _ _parent; 201 private readonly TSource _value; 202 private readonly IDisposable _self; 203 Iter(_ parent, TSource value, IDisposable self)204 public Iter(_ parent, TSource value, IDisposable self) 205 { 206 _parent = parent; 207 _value = value; 208 _self = self; 209 } 210 OnNext(TCollection value)211 public void OnNext(TCollection value) 212 { 213 var res = default(TResult); 214 215 try 216 { 217 res = _parent._parent._resultSelector(_value, value); 218 } 219 catch (Exception ex) 220 { 221 lock (_parent._gate) 222 { 223 _parent._observer.OnError(ex); 224 _parent.Dispose(); 225 } 226 return; 227 } 228 229 lock (_parent._gate) 230 _parent._observer.OnNext(res); 231 } 232 OnError(Exception error)233 public void OnError(Exception error) 234 { 235 lock (_parent._gate) 236 { 237 _parent._observer.OnError(error); 238 _parent.Dispose(); 239 } 240 } 241 OnCompleted()242 public void OnCompleted() 243 { 244 _parent._group.Remove(_self); 245 if (_parent._isStopped && _parent._group.Count == 1) 246 { 247 // 248 // Notice there can be a race between OnCompleted of the source and any 249 // of the inner sequences, where both see _group.Count == 1, and one is 250 // waiting for the lock. There won't be a double OnCompleted observation 251 // though, because the call to Dispose silences the observer by swapping 252 // in a NopObserver<T>. 253 // 254 lock (_parent._gate) 255 { 256 _parent._observer.OnCompleted(); 257 _parent.Dispose(); 258 } 259 } 260 } 261 } 262 } 263 264 class IndexSelectorImpl : Sink<TResult>, IObserver<TSource> 265 { 266 private readonly SelectMany<TSource, TCollection, TResult> _parent; 267 IndexSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)268 public IndexSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) 269 : base(observer, cancel) 270 { 271 _parent = parent; 272 } 273 274 private object _gate; 275 private bool _isStopped; 276 private CompositeDisposable _group; 277 private SingleAssignmentDisposable _sourceSubscription; 278 private int _index; 279 Run()280 public IDisposable Run() 281 { 282 _gate = new object(); 283 _isStopped = false; 284 _group = new CompositeDisposable(); 285 286 _sourceSubscription = new SingleAssignmentDisposable(); 287 _group.Add(_sourceSubscription); 288 _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this); 289 290 return _group; 291 } 292 OnNext(TSource value)293 public void OnNext(TSource value) 294 { 295 var index = checked(_index++); 296 var collection = default(IObservable<TCollection>); 297 298 try 299 { 300 collection = _parent._collectionSelectorI(value, index); 301 } 302 catch (Exception ex) 303 { 304 lock (_gate) 305 { 306 base._observer.OnError(ex); 307 base.Dispose(); 308 } 309 return; 310 } 311 312 var innerSubscription = new SingleAssignmentDisposable(); 313 _group.Add(innerSubscription); 314 innerSubscription.Disposable = collection.SubscribeSafe(new Iter(this, value, index, innerSubscription)); 315 } 316 OnError(Exception error)317 public void OnError(Exception error) 318 { 319 lock (_gate) 320 { 321 base._observer.OnError(error); 322 base.Dispose(); 323 } 324 } 325 OnCompleted()326 public void OnCompleted() 327 { 328 _isStopped = true; 329 if (_group.Count == 1) 330 { 331 // 332 // Notice there can be a race between OnCompleted of the source and any 333 // of the inner sequences, where both see _group.Count == 1, and one is 334 // waiting for the lock. There won't be a double OnCompleted observation 335 // though, because the call to Dispose silences the observer by swapping 336 // in a NopObserver<T>. 337 // 338 lock (_gate) 339 { 340 base._observer.OnCompleted(); 341 base.Dispose(); 342 } 343 } 344 else 345 { 346 _sourceSubscription.Dispose(); 347 } 348 } 349 350 class Iter : IObserver<TCollection> 351 { 352 private readonly IndexSelectorImpl _parent; 353 private readonly TSource _value; 354 private readonly int _valueIndex; 355 private readonly IDisposable _self; 356 Iter(IndexSelectorImpl parent, TSource value, int index, IDisposable self)357 public Iter(IndexSelectorImpl parent, TSource value, int index, IDisposable self) 358 { 359 _parent = parent; 360 _value = value; 361 _valueIndex = index; 362 _self = self; 363 } 364 365 private int _index; 366 OnNext(TCollection value)367 public void OnNext(TCollection value) 368 { 369 var res = default(TResult); 370 371 try 372 { 373 res = _parent._parent._resultSelectorI(_value, _valueIndex, value, checked(_index++)); 374 } 375 catch (Exception ex) 376 { 377 lock (_parent._gate) 378 { 379 _parent._observer.OnError(ex); 380 _parent.Dispose(); 381 } 382 return; 383 } 384 385 lock (_parent._gate) 386 _parent._observer.OnNext(res); 387 } 388 OnError(Exception error)389 public void OnError(Exception error) 390 { 391 lock (_parent._gate) 392 { 393 _parent._observer.OnError(error); 394 _parent.Dispose(); 395 } 396 } 397 OnCompleted()398 public void OnCompleted() 399 { 400 _parent._group.Remove(_self); 401 if (_parent._isStopped && _parent._group.Count == 1) 402 { 403 // 404 // Notice there can be a race between OnCompleted of the source and any 405 // of the inner sequences, where both see _group.Count == 1, and one is 406 // waiting for the lock. There won't be a double OnCompleted observation 407 // though, because the call to Dispose silences the observer by swapping 408 // in a NopObserver<T>. 409 // 410 lock (_parent._gate) 411 { 412 _parent._observer.OnCompleted(); 413 _parent.Dispose(); 414 } 415 } 416 } 417 } 418 } 419 420 class NoSelectorImpl : Sink<TResult>, IObserver<TSource> 421 { 422 private readonly SelectMany<TSource, TCollection, TResult> _parent; 423 NoSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)424 public NoSelectorImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) 425 : base(observer, cancel) 426 { 427 _parent = parent; 428 } 429 OnNext(TSource value)430 public void OnNext(TSource value) 431 { 432 var xs = default(IEnumerable<TCollection>); 433 try 434 { 435 xs = _parent._collectionSelectorE(value); 436 } 437 catch (Exception exception) 438 { 439 base._observer.OnError(exception); 440 base.Dispose(); 441 return; 442 } 443 444 var e = default(IEnumerator<TCollection>); 445 try 446 { 447 e = xs.GetEnumerator(); 448 } 449 catch (Exception exception) 450 { 451 base._observer.OnError(exception); 452 base.Dispose(); 453 return; 454 } 455 456 try 457 { 458 var hasNext = true; 459 while (hasNext) 460 { 461 hasNext = false; 462 var current = default(TResult); 463 464 try 465 { 466 hasNext = e.MoveNext(); 467 if (hasNext) 468 current = _parent._resultSelector(value, e.Current); 469 } 470 catch (Exception exception) 471 { 472 base._observer.OnError(exception); 473 base.Dispose(); 474 return; 475 } 476 477 if (hasNext) 478 base._observer.OnNext(current); 479 } 480 } 481 finally 482 { 483 if (e != null) 484 e.Dispose(); 485 } 486 } 487 OnError(Exception error)488 public void OnError(Exception error) 489 { 490 base._observer.OnError(error); 491 base.Dispose(); 492 } 493 OnCompleted()494 public void OnCompleted() 495 { 496 base._observer.OnCompleted(); 497 base.Dispose(); 498 } 499 } 500 501 class Omega : Sink<TResult>, IObserver<TSource> 502 { 503 private readonly SelectMany<TSource, TCollection, TResult> _parent; 504 Omega(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)505 public Omega(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) 506 : base(observer, cancel) 507 { 508 _parent = parent; 509 } 510 511 private int _index; 512 OnNext(TSource value)513 public void OnNext(TSource value) 514 { 515 var index = checked(_index++); 516 517 var xs = default(IEnumerable<TCollection>); 518 try 519 { 520 xs = _parent._collectionSelectorEI(value, index); 521 } 522 catch (Exception exception) 523 { 524 base._observer.OnError(exception); 525 base.Dispose(); 526 return; 527 } 528 529 var e = default(IEnumerator<TCollection>); 530 try 531 { 532 e = xs.GetEnumerator(); 533 } 534 catch (Exception exception) 535 { 536 base._observer.OnError(exception); 537 base.Dispose(); 538 return; 539 } 540 541 try 542 { 543 var eIndex = 0; 544 var hasNext = true; 545 while (hasNext) 546 { 547 hasNext = false; 548 var current = default(TResult); 549 550 try 551 { 552 hasNext = e.MoveNext(); 553 if (hasNext) 554 current = _parent._resultSelectorI(value, index, e.Current, checked(eIndex++)); 555 } 556 catch (Exception exception) 557 { 558 base._observer.OnError(exception); 559 base.Dispose(); 560 return; 561 } 562 563 if (hasNext) 564 base._observer.OnNext(current); 565 } 566 } 567 finally 568 { 569 if (e != null) 570 e.Dispose(); 571 } 572 } 573 OnError(Exception error)574 public void OnError(Exception error) 575 { 576 base._observer.OnError(error); 577 base.Dispose(); 578 } 579 OnCompleted()580 public void OnCompleted() 581 { 582 base._observer.OnCompleted(); 583 base.Dispose(); 584 } 585 } 586 587 #if !NO_TPL 588 #pragma warning disable 0420 589 class SelectManyImpl : Sink<TResult>, IObserver<TSource> 590 { 591 private readonly SelectMany<TSource, TCollection, TResult> _parent; 592 SelectManyImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)593 public SelectManyImpl(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) 594 : base(observer, cancel) 595 { 596 _parent = parent; 597 } 598 599 private object _gate; 600 private CancellationDisposable _cancel; 601 private volatile int _count; 602 Run()603 public IDisposable Run() 604 { 605 _gate = new object(); 606 _cancel = new CancellationDisposable(); 607 _count = 1; 608 609 return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel); 610 } 611 OnNext(TSource value)612 public void OnNext(TSource value) 613 { 614 var task = default(Task<TCollection>); 615 try 616 { 617 Interlocked.Increment(ref _count); 618 task = _parent._collectionSelectorT(value, _cancel.Token); 619 } 620 catch (Exception ex) 621 { 622 lock (_gate) 623 { 624 base._observer.OnError(ex); 625 base.Dispose(); 626 } 627 628 return; 629 } 630 631 if (task.IsCompleted) 632 { 633 OnCompletedTask(value, task); 634 } 635 else 636 { 637 AttachContinuation(value, task); 638 } 639 } 640 AttachContinuation(TSource value, Task<TCollection> task)641 private void AttachContinuation(TSource value, Task<TCollection> task) 642 { 643 // 644 // Separate method to avoid closure in synchronous completion case. 645 // 646 task.ContinueWith(t => OnCompletedTask(value, t)); 647 } 648 OnCompletedTask(TSource value, Task<TCollection> task)649 private void OnCompletedTask(TSource value, Task<TCollection> task) 650 { 651 switch (task.Status) 652 { 653 case TaskStatus.RanToCompletion: 654 { 655 var res = default(TResult); 656 try 657 { 658 res = _parent._resultSelector(value, task.Result); 659 } 660 catch (Exception ex) 661 { 662 lock (_gate) 663 { 664 base._observer.OnError(ex); 665 base.Dispose(); 666 } 667 668 return; 669 } 670 671 lock (_gate) 672 base._observer.OnNext(res); 673 674 OnCompleted(); 675 } 676 break; 677 case TaskStatus.Faulted: 678 { 679 lock (_gate) 680 { 681 base._observer.OnError(task.Exception.InnerException); 682 base.Dispose(); 683 } 684 } 685 break; 686 case TaskStatus.Canceled: 687 { 688 if (!_cancel.IsDisposed) 689 { 690 lock (_gate) 691 { 692 base._observer.OnError(new TaskCanceledException(task)); 693 base.Dispose(); 694 } 695 } 696 } 697 break; 698 } 699 } 700 OnError(Exception error)701 public void OnError(Exception error) 702 { 703 lock (_gate) 704 { 705 base._observer.OnError(error); 706 base.Dispose(); 707 } 708 } 709 OnCompleted()710 public void OnCompleted() 711 { 712 if (Interlocked.Decrement(ref _count) == 0) 713 { 714 lock (_gate) 715 { 716 base._observer.OnCompleted(); 717 base.Dispose(); 718 } 719 } 720 } 721 } 722 723 class Sigma : Sink<TResult>, IObserver<TSource> 724 { 725 private readonly SelectMany<TSource, TCollection, TResult> _parent; 726 Sigma(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel)727 public Sigma(SelectMany<TSource, TCollection, TResult> parent, IObserver<TResult> observer, IDisposable cancel) 728 : base(observer, cancel) 729 { 730 _parent = parent; 731 } 732 733 private object _gate; 734 private CancellationDisposable _cancel; 735 private volatile int _count; 736 private int _index; 737 Run()738 public IDisposable Run() 739 { 740 _gate = new object(); 741 _cancel = new CancellationDisposable(); 742 _count = 1; 743 744 return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel); 745 } 746 OnNext(TSource value)747 public void OnNext(TSource value) 748 { 749 var index = checked(_index++); 750 751 var task = default(Task<TCollection>); 752 try 753 { 754 Interlocked.Increment(ref _count); 755 task = _parent._collectionSelectorTI(value, index, _cancel.Token); 756 } 757 catch (Exception ex) 758 { 759 lock (_gate) 760 { 761 base._observer.OnError(ex); 762 base.Dispose(); 763 } 764 765 return; 766 } 767 768 if (task.IsCompleted) 769 { 770 OnCompletedTask(value, index, task); 771 } 772 else 773 { 774 AttachContinuation(value, index, task); 775 } 776 } 777 AttachContinuation(TSource value, int index, Task<TCollection> task)778 private void AttachContinuation(TSource value, int index, Task<TCollection> task) 779 { 780 // 781 // Separate method to avoid closure in synchronous completion case. 782 // 783 task.ContinueWith(t => OnCompletedTask(value, index, t)); 784 } 785 OnCompletedTask(TSource value, int index, Task<TCollection> task)786 private void OnCompletedTask(TSource value, int index, Task<TCollection> task) 787 { 788 switch (task.Status) 789 { 790 case TaskStatus.RanToCompletion: 791 { 792 var res = default(TResult); 793 try 794 { 795 res = _parent._resultSelectorTI(value, index, task.Result); 796 } 797 catch (Exception ex) 798 { 799 lock (_gate) 800 { 801 base._observer.OnError(ex); 802 base.Dispose(); 803 } 804 805 return; 806 } 807 808 lock (_gate) 809 base._observer.OnNext(res); 810 811 OnCompleted(); 812 } 813 break; 814 case TaskStatus.Faulted: 815 { 816 lock (_gate) 817 { 818 base._observer.OnError(task.Exception.InnerException); 819 base.Dispose(); 820 } 821 } 822 break; 823 case TaskStatus.Canceled: 824 { 825 if (!_cancel.IsDisposed) 826 { 827 lock (_gate) 828 { 829 base._observer.OnError(new TaskCanceledException(task)); 830 base.Dispose(); 831 } 832 } 833 } 834 break; 835 } 836 } 837 OnError(Exception error)838 public void OnError(Exception error) 839 { 840 lock (_gate) 841 { 842 base._observer.OnError(error); 843 base.Dispose(); 844 } 845 } 846 OnCompleted()847 public void OnCompleted() 848 { 849 if (Interlocked.Decrement(ref _count) == 0) 850 { 851 lock (_gate) 852 { 853 base._observer.OnCompleted(); 854 base.Dispose(); 855 } 856 } 857 } 858 } 859 #pragma warning restore 0420 860 #endif 861 } 862 863 class SelectMany<TSource, TResult> : Producer<TResult> 864 { 865 private readonly IObservable<TSource> _source; 866 private readonly Func<TSource, IObservable<TResult>> _selector; 867 private readonly Func<TSource, int, IObservable<TResult>> _selectorI; 868 private readonly Func<Exception, IObservable<TResult>> _selectorOnError; 869 private readonly Func<IObservable<TResult>> _selectorOnCompleted; 870 private readonly Func<TSource, IEnumerable<TResult>> _selectorE; 871 private readonly Func<TSource, int, IEnumerable<TResult>> _selectorEI; 872 SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)873 public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector) 874 { 875 _source = source; 876 _selector = selector; 877 } 878 SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)879 public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector) 880 { 881 _source = source; 882 _selectorI = selector; 883 } 884 SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)885 public SelectMany(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted) 886 { 887 _source = source; 888 _selector = selector; 889 _selectorOnError = selectorOnError; 890 _selectorOnCompleted = selectorOnCompleted; 891 } 892 SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted)893 public SelectMany(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector, Func<Exception, IObservable<TResult>> selectorOnError, Func<IObservable<TResult>> selectorOnCompleted) 894 { 895 _source = source; 896 _selectorI = selector; 897 _selectorOnError = selectorOnError; 898 _selectorOnCompleted = selectorOnCompleted; 899 } 900 SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector)901 public SelectMany(IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector) 902 { 903 _source = source; 904 _selectorE = selector; 905 } 906 SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector)907 public SelectMany(IObservable<TSource> source, Func<TSource, int, IEnumerable<TResult>> selector) 908 { 909 _source = source; 910 _selectorEI = selector; 911 } 912 913 #if !NO_TPL 914 private readonly Func<TSource, CancellationToken, Task<TResult>> _selectorT; 915 private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selectorTI; 916 SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)917 public SelectMany(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector) 918 { 919 _source = source; 920 _selectorT = selector; 921 } 922 SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)923 public SelectMany(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector) 924 { 925 _source = source; 926 _selectorTI = selector; 927 } 928 #endif 929 Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)930 protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink) 931 { 932 if (_selector != null) 933 { 934 var sink = new _(this, observer, cancel); 935 setSink(sink); 936 return sink.Run(); 937 } 938 else if (_selectorI != null) 939 { 940 var sink = new IndexSelectorImpl(this, observer, cancel); 941 setSink(sink); 942 return sink.Run(); 943 } 944 #if !NO_TPL 945 else if (_selectorT != null) 946 { 947 var sink = new SelectManyImpl(this, observer, cancel); 948 setSink(sink); 949 return sink.Run(); 950 } 951 else if (_selectorTI != null) 952 { 953 var sink = new Sigma(this, observer, cancel); 954 setSink(sink); 955 return sink.Run(); 956 } 957 #endif 958 else if (_selectorE != null) 959 { 960 var sink = new NoSelectorImpl(this, observer, cancel); 961 setSink(sink); 962 return _source.SubscribeSafe(sink); 963 } 964 else 965 { 966 var sink = new Omega(this, observer, cancel); 967 setSink(sink); 968 return _source.SubscribeSafe(sink); 969 } 970 } 971 972 class _ : Sink<TResult>, IObserver<TSource> 973 { 974 private readonly SelectMany<TSource, TResult> _parent; 975 _(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)976 public _(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) 977 : base(observer, cancel) 978 { 979 _parent = parent; 980 } 981 982 private object _gate; 983 private bool _isStopped; 984 private CompositeDisposable _group; 985 private SingleAssignmentDisposable _sourceSubscription; 986 Run()987 public IDisposable Run() 988 { 989 _gate = new object(); 990 _isStopped = false; 991 _group = new CompositeDisposable(); 992 993 _sourceSubscription = new SingleAssignmentDisposable(); 994 _group.Add(_sourceSubscription); 995 _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this); 996 997 return _group; 998 } 999 OnNext(TSource value)1000 public void OnNext(TSource value) 1001 { 1002 var inner = default(IObservable<TResult>); 1003 1004 try 1005 { 1006 inner = _parent._selector(value); 1007 } 1008 catch (Exception ex) 1009 { 1010 lock (_gate) 1011 { 1012 base._observer.OnError(ex); 1013 base.Dispose(); 1014 } 1015 return; 1016 } 1017 1018 SubscribeInner(inner); 1019 } 1020 OnError(Exception error)1021 public void OnError(Exception error) 1022 { 1023 if (_parent._selectorOnError != null) 1024 { 1025 var inner = default(IObservable<TResult>); 1026 1027 try 1028 { 1029 inner = _parent._selectorOnError(error); 1030 } 1031 catch (Exception ex) 1032 { 1033 lock (_gate) 1034 { 1035 base._observer.OnError(ex); 1036 base.Dispose(); 1037 } 1038 return; 1039 } 1040 1041 SubscribeInner(inner); 1042 1043 Final(); 1044 } 1045 else 1046 { 1047 lock (_gate) 1048 { 1049 base._observer.OnError(error); 1050 base.Dispose(); 1051 } 1052 } 1053 } 1054 OnCompleted()1055 public void OnCompleted() 1056 { 1057 if (_parent._selectorOnCompleted != null) 1058 { 1059 var inner = default(IObservable<TResult>); 1060 1061 try 1062 { 1063 inner = _parent._selectorOnCompleted(); 1064 } 1065 catch (Exception ex) 1066 { 1067 lock (_gate) 1068 { 1069 base._observer.OnError(ex); 1070 base.Dispose(); 1071 } 1072 return; 1073 } 1074 1075 SubscribeInner(inner); 1076 } 1077 1078 Final(); 1079 } 1080 Final()1081 private void Final() 1082 { 1083 _isStopped = true; 1084 if (_group.Count == 1) 1085 { 1086 // 1087 // Notice there can be a race between OnCompleted of the source and any 1088 // of the inner sequences, where both see _group.Count == 1, and one is 1089 // waiting for the lock. There won't be a double OnCompleted observation 1090 // though, because the call to Dispose silences the observer by swapping 1091 // in a NopObserver<T>. 1092 // 1093 lock (_gate) 1094 { 1095 base._observer.OnCompleted(); 1096 base.Dispose(); 1097 } 1098 } 1099 else 1100 { 1101 _sourceSubscription.Dispose(); 1102 } 1103 } 1104 SubscribeInner(IObservable<TResult> inner)1105 private void SubscribeInner(IObservable<TResult> inner) 1106 { 1107 var innerSubscription = new SingleAssignmentDisposable(); 1108 _group.Add(innerSubscription); 1109 innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription)); 1110 } 1111 1112 class Iter : IObserver<TResult> 1113 { 1114 private readonly _ _parent; 1115 private readonly IDisposable _self; 1116 Iter(_ parent, IDisposable self)1117 public Iter(_ parent, IDisposable self) 1118 { 1119 _parent = parent; 1120 _self = self; 1121 } 1122 OnNext(TResult value)1123 public void OnNext(TResult value) 1124 { 1125 lock (_parent._gate) 1126 _parent._observer.OnNext(value); 1127 } 1128 OnError(Exception error)1129 public void OnError(Exception error) 1130 { 1131 lock (_parent._gate) 1132 { 1133 _parent._observer.OnError(error); 1134 _parent.Dispose(); 1135 } 1136 } 1137 OnCompleted()1138 public void OnCompleted() 1139 { 1140 _parent._group.Remove(_self); 1141 if (_parent._isStopped && _parent._group.Count == 1) 1142 { 1143 // 1144 // Notice there can be a race between OnCompleted of the source and any 1145 // of the inner sequences, where both see _group.Count == 1, and one is 1146 // waiting for the lock. There won't be a double OnCompleted observation 1147 // though, because the call to Dispose silences the observer by swapping 1148 // in a NopObserver<T>. 1149 // 1150 lock (_parent._gate) 1151 { 1152 _parent._observer.OnCompleted(); 1153 _parent.Dispose(); 1154 } 1155 } 1156 } 1157 } 1158 } 1159 1160 class IndexSelectorImpl : Sink<TResult>, IObserver<TSource> 1161 { 1162 private readonly SelectMany<TSource, TResult> _parent; 1163 IndexSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)1164 public IndexSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) 1165 : base(observer, cancel) 1166 { 1167 _parent = parent; 1168 } 1169 1170 private object _gate; 1171 private bool _isStopped; 1172 private CompositeDisposable _group; 1173 private SingleAssignmentDisposable _sourceSubscription; 1174 private int _index; 1175 Run()1176 public IDisposable Run() 1177 { 1178 _gate = new object(); 1179 _isStopped = false; 1180 _group = new CompositeDisposable(); 1181 1182 _sourceSubscription = new SingleAssignmentDisposable(); 1183 _group.Add(_sourceSubscription); 1184 _sourceSubscription.Disposable = _parent._source.SubscribeSafe(this); 1185 1186 return _group; 1187 } 1188 OnNext(TSource value)1189 public void OnNext(TSource value) 1190 { 1191 var inner = default(IObservable<TResult>); 1192 1193 try 1194 { 1195 inner = _parent._selectorI(value, checked(_index++)); 1196 } 1197 catch (Exception ex) 1198 { 1199 lock (_gate) 1200 { 1201 base._observer.OnError(ex); 1202 base.Dispose(); 1203 } 1204 return; 1205 } 1206 1207 SubscribeInner(inner); 1208 } 1209 OnError(Exception error)1210 public void OnError(Exception error) 1211 { 1212 if (_parent._selectorOnError != null) 1213 { 1214 var inner = default(IObservable<TResult>); 1215 1216 try 1217 { 1218 inner = _parent._selectorOnError(error); 1219 } 1220 catch (Exception ex) 1221 { 1222 lock (_gate) 1223 { 1224 base._observer.OnError(ex); 1225 base.Dispose(); 1226 } 1227 return; 1228 } 1229 1230 SubscribeInner(inner); 1231 1232 Final(); 1233 } 1234 else 1235 { 1236 lock (_gate) 1237 { 1238 base._observer.OnError(error); 1239 base.Dispose(); 1240 } 1241 } 1242 } 1243 OnCompleted()1244 public void OnCompleted() 1245 { 1246 if (_parent._selectorOnCompleted != null) 1247 { 1248 var inner = default(IObservable<TResult>); 1249 1250 try 1251 { 1252 inner = _parent._selectorOnCompleted(); 1253 } 1254 catch (Exception ex) 1255 { 1256 lock (_gate) 1257 { 1258 base._observer.OnError(ex); 1259 base.Dispose(); 1260 } 1261 return; 1262 } 1263 1264 SubscribeInner(inner); 1265 } 1266 1267 Final(); 1268 } 1269 Final()1270 private void Final() 1271 { 1272 _isStopped = true; 1273 if (_group.Count == 1) 1274 { 1275 // 1276 // Notice there can be a race between OnCompleted of the source and any 1277 // of the inner sequences, where both see _group.Count == 1, and one is 1278 // waiting for the lock. There won't be a double OnCompleted observation 1279 // though, because the call to Dispose silences the observer by swapping 1280 // in a NopObserver<T>. 1281 // 1282 lock (_gate) 1283 { 1284 base._observer.OnCompleted(); 1285 base.Dispose(); 1286 } 1287 } 1288 else 1289 { 1290 _sourceSubscription.Dispose(); 1291 } 1292 } 1293 SubscribeInner(IObservable<TResult> inner)1294 private void SubscribeInner(IObservable<TResult> inner) 1295 { 1296 var innerSubscription = new SingleAssignmentDisposable(); 1297 _group.Add(innerSubscription); 1298 innerSubscription.Disposable = inner.SubscribeSafe(new Iter(this, innerSubscription)); 1299 } 1300 1301 class Iter : IObserver<TResult> 1302 { 1303 private readonly IndexSelectorImpl _parent; 1304 private readonly IDisposable _self; 1305 Iter(IndexSelectorImpl parent, IDisposable self)1306 public Iter(IndexSelectorImpl parent, IDisposable self) 1307 { 1308 _parent = parent; 1309 _self = self; 1310 } 1311 OnNext(TResult value)1312 public void OnNext(TResult value) 1313 { 1314 lock (_parent._gate) 1315 _parent._observer.OnNext(value); 1316 } 1317 OnError(Exception error)1318 public void OnError(Exception error) 1319 { 1320 lock (_parent._gate) 1321 { 1322 _parent._observer.OnError(error); 1323 _parent.Dispose(); 1324 } 1325 } 1326 OnCompleted()1327 public void OnCompleted() 1328 { 1329 _parent._group.Remove(_self); 1330 if (_parent._isStopped && _parent._group.Count == 1) 1331 { 1332 // 1333 // Notice there can be a race between OnCompleted of the source and any 1334 // of the inner sequences, where both see _group.Count == 1, and one is 1335 // waiting for the lock. There won't be a double OnCompleted observation 1336 // though, because the call to Dispose silences the observer by swapping 1337 // in a NopObserver<T>. 1338 // 1339 lock (_parent._gate) 1340 { 1341 _parent._observer.OnCompleted(); 1342 _parent.Dispose(); 1343 } 1344 } 1345 } 1346 } 1347 } 1348 1349 class NoSelectorImpl : Sink<TResult>, IObserver<TSource> 1350 { 1351 private readonly SelectMany<TSource, TResult> _parent; 1352 NoSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)1353 public NoSelectorImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) 1354 : base(observer, cancel) 1355 { 1356 _parent = parent; 1357 } 1358 OnNext(TSource value)1359 public void OnNext(TSource value) 1360 { 1361 var xs = default(IEnumerable<TResult>); 1362 try 1363 { 1364 xs = _parent._selectorE(value); 1365 } 1366 catch (Exception exception) 1367 { 1368 base._observer.OnError(exception); 1369 base.Dispose(); 1370 return; 1371 } 1372 1373 var e = default(IEnumerator<TResult>); 1374 try 1375 { 1376 e = xs.GetEnumerator(); 1377 } 1378 catch (Exception exception) 1379 { 1380 base._observer.OnError(exception); 1381 base.Dispose(); 1382 return; 1383 } 1384 1385 try 1386 { 1387 var hasNext = true; 1388 while (hasNext) 1389 { 1390 hasNext = false; 1391 var current = default(TResult); 1392 1393 try 1394 { 1395 hasNext = e.MoveNext(); 1396 if (hasNext) 1397 current = e.Current; 1398 } 1399 catch (Exception exception) 1400 { 1401 base._observer.OnError(exception); 1402 base.Dispose(); 1403 return; 1404 } 1405 1406 if (hasNext) 1407 base._observer.OnNext(current); 1408 } 1409 } 1410 finally 1411 { 1412 if (e != null) 1413 e.Dispose(); 1414 } 1415 } 1416 OnError(Exception error)1417 public void OnError(Exception error) 1418 { 1419 base._observer.OnError(error); 1420 base.Dispose(); 1421 } 1422 OnCompleted()1423 public void OnCompleted() 1424 { 1425 base._observer.OnCompleted(); 1426 base.Dispose(); 1427 } 1428 } 1429 1430 class Omega : Sink<TResult>, IObserver<TSource> 1431 { 1432 private readonly SelectMany<TSource, TResult> _parent; 1433 Omega(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)1434 public Omega(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) 1435 : base(observer, cancel) 1436 { 1437 _parent = parent; 1438 } 1439 1440 private int _index; 1441 OnNext(TSource value)1442 public void OnNext(TSource value) 1443 { 1444 var xs = default(IEnumerable<TResult>); 1445 try 1446 { 1447 xs = _parent._selectorEI(value, checked(_index++)); 1448 } 1449 catch (Exception exception) 1450 { 1451 base._observer.OnError(exception); 1452 base.Dispose(); 1453 return; 1454 } 1455 1456 var e = default(IEnumerator<TResult>); 1457 try 1458 { 1459 e = xs.GetEnumerator(); 1460 } 1461 catch (Exception exception) 1462 { 1463 base._observer.OnError(exception); 1464 base.Dispose(); 1465 return; 1466 } 1467 1468 try 1469 { 1470 var hasNext = true; 1471 while (hasNext) 1472 { 1473 hasNext = false; 1474 var current = default(TResult); 1475 1476 try 1477 { 1478 hasNext = e.MoveNext(); 1479 if (hasNext) 1480 current = e.Current; 1481 } 1482 catch (Exception exception) 1483 { 1484 base._observer.OnError(exception); 1485 base.Dispose(); 1486 return; 1487 } 1488 1489 if (hasNext) 1490 base._observer.OnNext(current); 1491 } 1492 } 1493 finally 1494 { 1495 if (e != null) 1496 e.Dispose(); 1497 } 1498 } 1499 OnError(Exception error)1500 public void OnError(Exception error) 1501 { 1502 base._observer.OnError(error); 1503 base.Dispose(); 1504 } 1505 OnCompleted()1506 public void OnCompleted() 1507 { 1508 base._observer.OnCompleted(); 1509 base.Dispose(); 1510 } 1511 } 1512 1513 #if !NO_TPL 1514 #pragma warning disable 0420 1515 class SelectManyImpl : Sink<TResult>, IObserver<TSource> 1516 { 1517 private readonly SelectMany<TSource, TResult> _parent; 1518 SelectManyImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)1519 public SelectManyImpl(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) 1520 : base(observer, cancel) 1521 { 1522 _parent = parent; 1523 } 1524 1525 private object _gate; 1526 private CancellationDisposable _cancel; 1527 private volatile int _count; 1528 Run()1529 public IDisposable Run() 1530 { 1531 _gate = new object(); 1532 _cancel = new CancellationDisposable(); 1533 _count = 1; 1534 1535 return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel); 1536 } 1537 OnNext(TSource value)1538 public void OnNext(TSource value) 1539 { 1540 var task = default(Task<TResult>); 1541 try 1542 { 1543 Interlocked.Increment(ref _count); 1544 task = _parent._selectorT(value, _cancel.Token); 1545 } 1546 catch (Exception ex) 1547 { 1548 lock (_gate) 1549 { 1550 base._observer.OnError(ex); 1551 base.Dispose(); 1552 } 1553 1554 return; 1555 } 1556 1557 if (task.IsCompleted) 1558 { 1559 OnCompletedTask(task); 1560 } 1561 else 1562 { 1563 task.ContinueWith(OnCompletedTask); 1564 } 1565 } 1566 OnCompletedTask(Task<TResult> task)1567 private void OnCompletedTask(Task<TResult> task) 1568 { 1569 switch (task.Status) 1570 { 1571 case TaskStatus.RanToCompletion: 1572 { 1573 lock (_gate) 1574 base._observer.OnNext(task.Result); 1575 1576 OnCompleted(); 1577 } 1578 break; 1579 case TaskStatus.Faulted: 1580 { 1581 lock (_gate) 1582 { 1583 base._observer.OnError(task.Exception.InnerException); 1584 base.Dispose(); 1585 } 1586 } 1587 break; 1588 case TaskStatus.Canceled: 1589 { 1590 if (!_cancel.IsDisposed) 1591 { 1592 lock (_gate) 1593 { 1594 base._observer.OnError(new TaskCanceledException(task)); 1595 base.Dispose(); 1596 } 1597 } 1598 } 1599 break; 1600 } 1601 } 1602 OnError(Exception error)1603 public void OnError(Exception error) 1604 { 1605 lock (_gate) 1606 { 1607 base._observer.OnError(error); 1608 base.Dispose(); 1609 } 1610 } 1611 OnCompleted()1612 public void OnCompleted() 1613 { 1614 if (Interlocked.Decrement(ref _count) == 0) 1615 { 1616 lock (_gate) 1617 { 1618 base._observer.OnCompleted(); 1619 base.Dispose(); 1620 } 1621 } 1622 } 1623 } 1624 1625 class Sigma : Sink<TResult>, IObserver<TSource> 1626 { 1627 private readonly SelectMany<TSource, TResult> _parent; 1628 Sigma(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel)1629 public Sigma(SelectMany<TSource, TResult> parent, IObserver<TResult> observer, IDisposable cancel) 1630 : base(observer, cancel) 1631 { 1632 _parent = parent; 1633 } 1634 1635 private object _gate; 1636 private CancellationDisposable _cancel; 1637 private volatile int _count; 1638 private int _index; 1639 Run()1640 public IDisposable Run() 1641 { 1642 _gate = new object(); 1643 _cancel = new CancellationDisposable(); 1644 _count = 1; 1645 1646 return new CompositeDisposable(_parent._source.SubscribeSafe(this), _cancel); 1647 } 1648 OnNext(TSource value)1649 public void OnNext(TSource value) 1650 { 1651 var task = default(Task<TResult>); 1652 try 1653 { 1654 Interlocked.Increment(ref _count); 1655 task = _parent._selectorTI(value, checked(_index++), _cancel.Token); 1656 } 1657 catch (Exception ex) 1658 { 1659 lock (_gate) 1660 { 1661 base._observer.OnError(ex); 1662 base.Dispose(); 1663 } 1664 1665 return; 1666 } 1667 1668 if (task.IsCompleted) 1669 { 1670 OnCompletedTask(task); 1671 } 1672 else 1673 { 1674 task.ContinueWith(OnCompletedTask); 1675 } 1676 } 1677 OnCompletedTask(Task<TResult> task)1678 private void OnCompletedTask(Task<TResult> task) 1679 { 1680 switch (task.Status) 1681 { 1682 case TaskStatus.RanToCompletion: 1683 { 1684 lock (_gate) 1685 base._observer.OnNext(task.Result); 1686 1687 OnCompleted(); 1688 } 1689 break; 1690 case TaskStatus.Faulted: 1691 { 1692 lock (_gate) 1693 { 1694 base._observer.OnError(task.Exception.InnerException); 1695 base.Dispose(); 1696 } 1697 } 1698 break; 1699 case TaskStatus.Canceled: 1700 { 1701 if (!_cancel.IsDisposed) 1702 { 1703 lock (_gate) 1704 { 1705 base._observer.OnError(new TaskCanceledException(task)); 1706 base.Dispose(); 1707 } 1708 } 1709 } 1710 break; 1711 } 1712 } 1713 OnError(Exception error)1714 public void OnError(Exception error) 1715 { 1716 lock (_gate) 1717 { 1718 base._observer.OnError(error); 1719 base.Dispose(); 1720 } 1721 } 1722 OnCompleted()1723 public void OnCompleted() 1724 { 1725 if (Interlocked.Decrement(ref _count) == 0) 1726 { 1727 lock (_gate) 1728 { 1729 base._observer.OnCompleted(); 1730 base.Dispose(); 1731 } 1732 } 1733 } 1734 } 1735 #pragma warning restore 0420 1736 #endif 1737 } 1738 } 1739 #endif