1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #if !NO_PERF 4 using System; 5 using System.Collections.Generic; 6 using System.Reactive.Concurrency; 7 using System.Reactive.Disposables; 8 9 namespace System.Reactive.Linq.ObservableImpl 10 { 11 class OnErrorResumeNext<TSource> : Producer<TSource> 12 { 13 private readonly IEnumerable<IObservable<TSource>> _sources; 14 OnErrorResumeNext(IEnumerable<IObservable<TSource>> sources)15 public OnErrorResumeNext(IEnumerable<IObservable<TSource>> sources) 16 { 17 _sources = sources; 18 } 19 Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)20 protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink) 21 { 22 var sink = new _(observer, cancel); 23 setSink(sink); 24 return sink.Run(_sources); 25 } 26 27 class _ : TailRecursiveSink<TSource> 28 { _(IObserver<TSource> observer, IDisposable cancel)29 public _(IObserver<TSource> observer, IDisposable cancel) 30 : base(observer, cancel) 31 { 32 } 33 Extract(IObservable<TSource> source)34 protected override IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source) 35 { 36 var oern = source as OnErrorResumeNext<TSource>; 37 if (oern != null) 38 return oern._sources; 39 40 return null; 41 } 42 OnNext(TSource value)43 public override void OnNext(TSource value) 44 { 45 base._observer.OnNext(value); 46 } 47 OnError(Exception error)48 public override void OnError(Exception error) 49 { 50 _recurse(); 51 } 52 OnCompleted()53 public override void OnCompleted() 54 { 55 _recurse(); 56 } 57 } 58 } 59 } 60 #endif