1import { Operator } from '../Operator';
2import { Subscriber } from '../Subscriber';
3import { Observable } from '../Observable';
4
5import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types';
6import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe';
7
8/* tslint:disable:max-line-length */
9export function catchError<T, O extends ObservableInput<any>>(selector: (err: any, caught: Observable<T>) => O): OperatorFunction<T, T | ObservedValueOf<O>>;
10/* tslint:enable:max-line-length */
11
12/**
13 * Catches errors on the observable to be handled by returning a new observable or throwing an error.
14 *
15 * ![](catch.png)
16 *
17 * ## Examples
18 * Continues with a different Observable when there's an error
19 *
20 * ```ts
21 * import { of } from 'rxjs';
22 * import { map, catchError } from 'rxjs/operators';
23 *
24 * of(1, 2, 3, 4, 5).pipe(
25 *     map(n => {
26 *   	   if (n === 4) {
27 * 	       throw 'four!';
28 *       }
29 *	     return n;
30 *     }),
31 *     catchError(err => of('I', 'II', 'III', 'IV', 'V')),
32 *   )
33 *   .subscribe(x => console.log(x));
34 *   // 1, 2, 3, I, II, III, IV, V
35 * ```
36 *
37 * Retries the caught source Observable again in case of error, similar to retry() operator
38 *
39 * ```ts
40 * import { of } from 'rxjs';
41 * import { map, catchError, take } from 'rxjs/operators';
42 *
43 * of(1, 2, 3, 4, 5).pipe(
44 *     map(n => {
45 *   	   if (n === 4) {
46 *   	     throw 'four!';
47 *       }
48 * 	     return n;
49 *     }),
50 *     catchError((err, caught) => caught),
51 *     take(30),
52 *   )
53 *   .subscribe(x => console.log(x));
54 *   // 1, 2, 3, 1, 2, 3, ...
55 * ```
56 *
57 * Throws a new error when the source Observable throws an error
58 *
59 * ```ts
60 * import { of } from 'rxjs';
61 * import { map, catchError } from 'rxjs/operators';
62 *
63 * of(1, 2, 3, 4, 5).pipe(
64 *     map(n => {
65 *       if (n === 4) {
66 *         throw 'four!';
67 *       }
68 *       return n;
69 *     }),
70 *     catchError(err => {
71 *       throw 'error in source. Details: ' + err;
72 *     }),
73 *   )
74 *   .subscribe(
75 *     x => console.log(x),
76 *     err => console.log(err)
77 *   );
78 *   // 1, 2, 3, error in source. Details: four!
79 * ```
80 *
81 *  @param {function} selector a function that takes as arguments `err`, which is the error, and `caught`, which
82 *  is the source observable, in case you'd like to "retry" that observable by returning it again. Whatever observable
83 *  is returned by the `selector` will be used to continue the observable chain.
84 * @return {Observable} An observable that originates from either the source or the observable returned by the
85 *  catch `selector` function.
86 * @name catchError
87 */
88export function catchError<T, O extends ObservableInput<any>>(
89  selector: (err: any, caught: Observable<T>) => O
90): OperatorFunction<T, T | ObservedValueOf<O>> {
91  return function catchErrorOperatorFunction(source: Observable<T>): Observable<T | ObservedValueOf<O>> {
92    const operator = new CatchOperator(selector);
93    const caught = source.lift(operator);
94    return (operator.caught = caught as Observable<T>);
95  };
96}
97
98class CatchOperator<T, R> implements Operator<T, T | R> {
99  caught: Observable<T>;
100
101  constructor(private selector: (err: any, caught: Observable<T>) => ObservableInput<T | R>) {
102  }
103
104  call(subscriber: Subscriber<R>, source: any): any {
105    return source.subscribe(new CatchSubscriber(subscriber, this.selector, this.caught));
106  }
107}
108
109/**
110 * We need this JSDoc comment for affecting ESDoc.
111 * @ignore
112 * @extends {Ignored}
113 */
114class CatchSubscriber<T, R> extends SimpleOuterSubscriber<T, T | R> {
115  constructor(destination: Subscriber<any>,
116              private selector: (err: any, caught: Observable<T>) => ObservableInput<T | R>,
117              private caught: Observable<T>) {
118    super(destination);
119  }
120
121  // NOTE: overriding `error` instead of `_error` because we don't want
122  // to have this flag this subscriber as `isStopped`. We can mimic the
123  // behavior of the RetrySubscriber (from the `retry` operator), where
124  // we unsubscribe from our source chain, reset our Subscriber flags,
125  // then subscribe to the selector result.
126  error(err: any) {
127    if (!this.isStopped) {
128      let result: any;
129      try {
130        result = this.selector(err, this.caught);
131      } catch (err2) {
132        super.error(err2);
133        return;
134      }
135      this._unsubscribeAndRecycle();
136      const innerSubscriber = new SimpleInnerSubscriber(this);
137      this.add(innerSubscriber);
138      const innerSubscription = innerSubscribe(result, innerSubscriber);
139      // The returned subscription will usually be the subscriber that was
140      // passed. However, interop subscribers will be wrapped and for
141      // unsubscriptions to chain correctly, the wrapper needs to be added, too.
142      if (innerSubscription !== innerSubscriber) {
143        this.add(innerSubscription);
144      }
145    }
146  }
147}
148