1/** @prettier */ 2import { Subscription } from './Subscription'; 3import { Subscriber } from './Subscriber'; 4import { Observable } from './Observable'; 5import { subscribeTo } from './util/subscribeTo'; 6 7interface SimpleOuterSubscriberLike<T> { 8 /** 9 * A handler for inner next notifications from the inner subscription 10 * @param innerValue the value nexted by the inner producer 11 */ 12 notifyNext(innerValue: T): void; 13 /** 14 * A handler for inner error notifications from the inner subscription 15 * @param err the error from the inner producer 16 */ 17 notifyError(err: any): void; 18 /** 19 * A handler for inner complete notifications from the inner subscription. 20 */ 21 notifyComplete(): void; 22} 23 24export class SimpleInnerSubscriber<T> extends Subscriber<T> { 25 constructor(private parent: SimpleOuterSubscriberLike<any>) { 26 super(); 27 } 28 29 protected _next(value: T): void { 30 this.parent.notifyNext(value); 31 } 32 33 protected _error(error: any): void { 34 this.parent.notifyError(error); 35 this.unsubscribe(); 36 } 37 38 protected _complete(): void { 39 this.parent.notifyComplete(); 40 this.unsubscribe(); 41 } 42} 43 44export class ComplexInnerSubscriber<T, R> extends Subscriber<R> { 45 constructor(private parent: ComplexOuterSubscriber<T, R>, public outerValue: T, public outerIndex: number) { 46 super(); 47 } 48 49 protected _next(value: R): void { 50 this.parent.notifyNext(this.outerValue, value, this.outerIndex, this); 51 } 52 53 protected _error(error: any): void { 54 this.parent.notifyError(error); 55 this.unsubscribe(); 56 } 57 58 protected _complete(): void { 59 this.parent.notifyComplete(this); 60 this.unsubscribe(); 61 } 62} 63 64export class SimpleOuterSubscriber<T, R> extends Subscriber<T> implements SimpleOuterSubscriberLike<R> { 65 notifyNext(innerValue: R): void { 66 this.destination.next(innerValue); 67 } 68 69 notifyError(err: any): void { 70 this.destination.error(err); 71 } 72 73 notifyComplete(): void { 74 this.destination.complete(); 75 } 76} 77 78/** 79 * DO NOT USE (formerly "OuterSubscriber") 80 * TODO: We want to refactor this and remove it. It is retaining values it shouldn't for long 81 * periods of time. 82 */ 83export class ComplexOuterSubscriber<T, R> extends Subscriber<T> { 84 /** 85 * @param _outerValue Used by: bufferToggle, delayWhen, windowToggle 86 * @param innerValue Used by: subclass default, combineLatest, race, bufferToggle, windowToggle, withLatestFrom 87 * @param _outerIndex Used by: combineLatest, race, withLatestFrom 88 * @param _innerSub Used by: delayWhen 89 */ 90 notifyNext(_outerValue: T, innerValue: R, _outerIndex: number, _innerSub: ComplexInnerSubscriber<T, R>): void { 91 this.destination.next(innerValue); 92 } 93 94 notifyError(error: any): void { 95 this.destination.error(error); 96 } 97 98 /** 99 * @param _innerSub Used by: race, bufferToggle, delayWhen, windowToggle, windowWhen 100 */ 101 notifyComplete(_innerSub: ComplexInnerSubscriber<T, R>): void { 102 this.destination.complete(); 103 } 104} 105 106export function innerSubscribe(result: any, innerSubscriber: Subscriber<any>): Subscription | undefined { 107 if (innerSubscriber.closed) { 108 return undefined; 109 } 110 if (result instanceof Observable) { 111 return result.subscribe(innerSubscriber); 112 } 113 return subscribeTo(result)(innerSubscriber) as Subscription; 114} 115