1import { SchedulerLike, SchedulerAction } from '../types'; 2import { Subscriber } from '../Subscriber'; 3import { Subscription } from '../Subscription'; 4import { Observable } from '../Observable'; 5import { asap } from '../scheduler/asap'; 6import { isNumeric } from '../util/isNumeric'; 7 8export interface DispatchArg<T> { 9 source: Observable<T>; 10 subscriber: Subscriber<T>; 11} 12 13/** 14 * We need this JSDoc comment for affecting ESDoc. 15 * @extends {Ignored} 16 * @hide true 17 */ 18export class SubscribeOnObservable<T> extends Observable<T> { 19 /** @nocollapse */ 20 static create<T>(source: Observable<T>, delay: number = 0, scheduler: SchedulerLike = asap): Observable<T> { 21 return new SubscribeOnObservable(source, delay, scheduler); 22 } 23 24 /** @nocollapse */ 25 static dispatch<T>(this: SchedulerAction<T>, arg: DispatchArg<T>): Subscription { 26 const { source, subscriber } = arg; 27 return this.add(source.subscribe(subscriber)); 28 } 29 30 constructor(public source: Observable<T>, 31 private delayTime: number = 0, 32 private scheduler: SchedulerLike = asap) { 33 super(); 34 if (!isNumeric(delayTime) || delayTime < 0) { 35 this.delayTime = 0; 36 } 37 if (!scheduler || typeof scheduler.schedule !== 'function') { 38 this.scheduler = asap; 39 } 40 } 41 42 /** @deprecated This is an internal implementation detail, do not use. */ 43 _subscribe(subscriber: Subscriber<T>) { 44 const delay = this.delayTime; 45 const source = this.source; 46 const scheduler = this.scheduler; 47 48 return scheduler.schedule<DispatchArg<any>>(SubscribeOnObservable.dispatch, delay, { 49 source, subscriber 50 }); 51 } 52} 53