1import { Operator } from '../Operator'; 2import { Subscriber } from '../Subscriber'; 3import { Observable } from '../Observable'; 4 5import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; 6import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe'; 7 8/* tslint:disable:max-line-length */ 9export function catchError<T, O extends ObservableInput<any>>(selector: (err: any, caught: Observable<T>) => O): OperatorFunction<T, T | ObservedValueOf<O>>; 10/* tslint:enable:max-line-length */ 11 12/** 13 * Catches errors on the observable to be handled by returning a new observable or throwing an error. 14 * 15 * ![](catch.png) 16 * 17 * ## Examples 18 * Continues with a different Observable when there's an error 19 * 20 * ```ts 21 * import { of } from 'rxjs'; 22 * import { map, catchError } from 'rxjs/operators'; 23 * 24 * of(1, 2, 3, 4, 5).pipe( 25 * map(n => { 26 * if (n === 4) { 27 * throw 'four!'; 28 * } 29 * return n; 30 * }), 31 * catchError(err => of('I', 'II', 'III', 'IV', 'V')), 32 * ) 33 * .subscribe(x => console.log(x)); 34 * // 1, 2, 3, I, II, III, IV, V 35 * ``` 36 * 37 * Retries the caught source Observable again in case of error, similar to retry() operator 38 * 39 * ```ts 40 * import { of } from 'rxjs'; 41 * import { map, catchError, take } from 'rxjs/operators'; 42 * 43 * of(1, 2, 3, 4, 5).pipe( 44 * map(n => { 45 * if (n === 4) { 46 * throw 'four!'; 47 * } 48 * return n; 49 * }), 50 * catchError((err, caught) => caught), 51 * take(30), 52 * ) 53 * .subscribe(x => console.log(x)); 54 * // 1, 2, 3, 1, 2, 3, ... 55 * ``` 56 * 57 * Throws a new error when the source Observable throws an error 58 * 59 * ```ts 60 * import { of } from 'rxjs'; 61 * import { map, catchError } from 'rxjs/operators'; 62 * 63 * of(1, 2, 3, 4, 5).pipe( 64 * map(n => { 65 * if (n === 4) { 66 * throw 'four!'; 67 * } 68 * return n; 69 * }), 70 * catchError(err => { 71 * throw 'error in source. Details: ' + err; 72 * }), 73 * ) 74 * .subscribe( 75 * x => console.log(x), 76 * err => console.log(err) 77 * ); 78 * // 1, 2, 3, error in source. Details: four! 79 * ``` 80 * 81 * @param {function} selector a function that takes as arguments `err`, which is the error, and `caught`, which 82 * is the source observable, in case you'd like to "retry" that observable by returning it again. Whatever observable 83 * is returned by the `selector` will be used to continue the observable chain. 84 * @return {Observable} An observable that originates from either the source or the observable returned by the 85 * catch `selector` function. 86 * @name catchError 87 */ 88export function catchError<T, O extends ObservableInput<any>>( 89 selector: (err: any, caught: Observable<T>) => O 90): OperatorFunction<T, T | ObservedValueOf<O>> { 91 return function catchErrorOperatorFunction(source: Observable<T>): Observable<T | ObservedValueOf<O>> { 92 const operator = new CatchOperator(selector); 93 const caught = source.lift(operator); 94 return (operator.caught = caught as Observable<T>); 95 }; 96} 97 98class CatchOperator<T, R> implements Operator<T, T | R> { 99 caught: Observable<T>; 100 101 constructor(private selector: (err: any, caught: Observable<T>) => ObservableInput<T | R>) { 102 } 103 104 call(subscriber: Subscriber<R>, source: any): any { 105 return source.subscribe(new CatchSubscriber(subscriber, this.selector, this.caught)); 106 } 107} 108 109/** 110 * We need this JSDoc comment for affecting ESDoc. 111 * @ignore 112 * @extends {Ignored} 113 */ 114class CatchSubscriber<T, R> extends SimpleOuterSubscriber<T, T | R> { 115 constructor(destination: Subscriber<any>, 116 private selector: (err: any, caught: Observable<T>) => ObservableInput<T | R>, 117 private caught: Observable<T>) { 118 super(destination); 119 } 120 121 // NOTE: overriding `error` instead of `_error` because we don't want 122 // to have this flag this subscriber as `isStopped`. We can mimic the 123 // behavior of the RetrySubscriber (from the `retry` operator), where 124 // we unsubscribe from our source chain, reset our Subscriber flags, 125 // then subscribe to the selector result. 126 error(err: any) { 127 if (!this.isStopped) { 128 let result: any; 129 try { 130 result = this.selector(err, this.caught); 131 } catch (err2) { 132 super.error(err2); 133 return; 134 } 135 this._unsubscribeAndRecycle(); 136 const innerSubscriber = new SimpleInnerSubscriber(this); 137 this.add(innerSubscriber); 138 const innerSubscription = innerSubscribe(result, innerSubscriber); 139 // The returned subscription will usually be the subscriber that was 140 // passed. However, interop subscribers will be wrapped and for 141 // unsubscriptions to chain correctly, the wrapper needs to be added, too. 142 if (innerSubscription !== innerSubscriber) { 143 this.add(innerSubscription); 144 } 145 } 146 } 147} 148