1import { Operator } from '../Operator'; 2import { Subscriber } from '../Subscriber'; 3import { Observable } from '../Observable'; 4import { Subject } from '../Subject'; 5import { Subscription } from '../Subscription'; 6 7import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; 8import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; 9 10/** 11 * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable 12 * calls `error`, this method will emit the Throwable that caused the error to the Observable returned from `notifier`. 13 * If that Observable calls `complete` or `error` then this method will call `complete` or `error` on the child 14 * subscription. Otherwise this method will resubscribe to the source Observable. 15 * 16 * ![](retryWhen.png) 17 * 18 * @param {function(errors: Observable): Observable} notifier - Receives an Observable of notifications with which a 19 * user can `complete` or `error`, aborting the retry. 20 * @return {Observable} The source Observable modified with retry logic. 21 * @method retryWhen 22 * @owner Observable 23 */ 24export function retryWhen<T>(notifier: (errors: Observable<any>) => Observable<any>): MonoTypeOperatorFunction<T> { 25 return (source: Observable<T>) => source.lift(new RetryWhenOperator(notifier, source)); 26} 27 28class RetryWhenOperator<T> implements Operator<T, T> { 29 constructor(protected notifier: (errors: Observable<any>) => Observable<any>, 30 protected source: Observable<T>) { 31 } 32 33 call(subscriber: Subscriber<T>, source: any): TeardownLogic { 34 return source.subscribe(new RetryWhenSubscriber(subscriber, this.notifier, this.source)); 35 } 36} 37 38/** 39 * We need this JSDoc comment for affecting ESDoc. 40 * @ignore 41 * @extends {Ignored} 42 */ 43class RetryWhenSubscriber<T, R> extends SimpleOuterSubscriber<T, R> { 44 45 private errors?: Subject<any>; 46 private retries?: Observable<any>; 47 private retriesSubscription?: Subscription; 48 49 constructor(destination: Subscriber<R>, 50 private notifier: (errors: Observable<any>) => Observable<any>, 51 private source: Observable<T>) { 52 super(destination); 53 } 54 55 error(err: any) { 56 if (!this.isStopped) { 57 58 let errors = this.errors; 59 let retries: any = this.retries; 60 let retriesSubscription = this.retriesSubscription; 61 62 if (!retries) { 63 errors = new Subject(); 64 try { 65 const { notifier } = this; 66 retries = notifier(errors); 67 } catch (e) { 68 return super.error(e); 69 } 70 retriesSubscription = innerSubscribe(retries, new SimpleInnerSubscriber(this)); 71 } else { 72 this.errors = undefined; 73 this.retriesSubscription = undefined; 74 } 75 76 this._unsubscribeAndRecycle(); 77 78 this.errors = errors; 79 this.retries = retries; 80 this.retriesSubscription = retriesSubscription; 81 82 errors!.next(err); 83 } 84 } 85 86 /** @deprecated This is an internal implementation detail, do not use. */ 87 _unsubscribe() { 88 const { errors, retriesSubscription } = this; 89 if (errors) { 90 errors.unsubscribe(); 91 this.errors = undefined; 92 } 93 if (retriesSubscription) { 94 retriesSubscription.unsubscribe(); 95 this.retriesSubscription = undefined; 96 } 97 this.retries = undefined; 98 } 99 100 notifyNext(): void { 101 const { _unsubscribe } = this; 102 103 this._unsubscribe = null!; 104 this._unsubscribeAndRecycle(); 105 this._unsubscribe = _unsubscribe; 106 107 this.source.subscribe(this); 108 } 109} 110