1import { Operator } from '../Operator'; 2import { Subscriber } from '../Subscriber'; 3import { Observable } from '../Observable'; 4import { Subscription } from '../Subscription'; 5import { OuterSubscriber } from '../OuterSubscriber'; 6import { InnerSubscriber } from '../InnerSubscriber'; 7import { subscribeToResult } from '../util/subscribeToResult'; 8import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; 9 10/* tslint:disable:max-line-length */ 11/** @deprecated In future versions, empty notifiers will no longer re-emit the source value on the output observable. */ 12export function delayWhen<T>(delayDurationSelector: (value: T, index: number) => Observable<never>, subscriptionDelay?: Observable<any>): MonoTypeOperatorFunction<T>; 13export function delayWhen<T>(delayDurationSelector: (value: T, index: number) => Observable<any>, subscriptionDelay?: Observable<any>): MonoTypeOperatorFunction<T>; 14/* tslint:disable:max-line-length */ 15 16/** 17 * Delays the emission of items from the source Observable by a given time span 18 * determined by the emissions of another Observable. 19 * 20 * <span class="informal">It's like {@link delay}, but the time span of the 21 * delay duration is determined by a second Observable.</span> 22 * 23 * ![](delayWhen.png) 24 * 25 * `delayWhen` time shifts each emitted value from the source Observable by a 26 * time span determined by another Observable. When the source emits a value, 27 * the `delayDurationSelector` function is called with the source value as 28 * argument, and should return an Observable, called the "duration" Observable. 29 * The source value is emitted on the output Observable only when the duration 30 * Observable emits a value or completes. 31 * The completion of the notifier triggering the emission of the source value 32 * is deprecated behavior and will be removed in future versions. 33 * 34 * Optionally, `delayWhen` takes a second argument, `subscriptionDelay`, which 35 * is an Observable. When `subscriptionDelay` emits its first value or 36 * completes, the source Observable is subscribed to and starts behaving like 37 * described in the previous paragraph. If `subscriptionDelay` is not provided, 38 * `delayWhen` will subscribe to the source Observable as soon as the output 39 * Observable is subscribed. 40 * 41 * ## Example 42 * Delay each click by a random amount of time, between 0 and 5 seconds 43 * ```ts 44 * import { fromEvent, interval } from 'rxjs'; 45 * import { delayWhen } from 'rxjs/operators'; 46 * 47 * const clicks = fromEvent(document, 'click'); 48 * const delayedClicks = clicks.pipe( 49 * delayWhen(event => interval(Math.random() * 5000)), 50 * ); 51 * delayedClicks.subscribe(x => console.log(x)); 52 * ``` 53 * 54 * @see {@link delay} 55 * @see {@link throttle} 56 * @see {@link throttleTime} 57 * @see {@link debounce} 58 * @see {@link debounceTime} 59 * @see {@link sample} 60 * @see {@link sampleTime} 61 * @see {@link audit} 62 * @see {@link auditTime} 63 * 64 * @param {function(value: T, index: number): Observable} delayDurationSelector A function that 65 * returns an Observable for each value emitted by the source Observable, which 66 * is then used to delay the emission of that item on the output Observable 67 * until the Observable returned from this function emits a value. 68 * @param {Observable} subscriptionDelay An Observable that triggers the 69 * subscription to the source Observable once it emits any value. 70 * @return {Observable} An Observable that delays the emissions of the source 71 * Observable by an amount of time specified by the Observable returned by 72 * `delayDurationSelector`. 73 * @method delayWhen 74 * @owner Observable 75 */ 76export function delayWhen<T>(delayDurationSelector: (value: T, index: number) => Observable<any>, 77 subscriptionDelay?: Observable<any>): MonoTypeOperatorFunction<T> { 78 if (subscriptionDelay) { 79 return (source: Observable<T>) => 80 new SubscriptionDelayObservable(source, subscriptionDelay) 81 .lift(new DelayWhenOperator(delayDurationSelector)); 82 } 83 return (source: Observable<T>) => source.lift(new DelayWhenOperator(delayDurationSelector)); 84} 85 86class DelayWhenOperator<T> implements Operator<T, T> { 87 constructor(private delayDurationSelector: (value: T, index: number) => Observable<any>) { 88 } 89 90 call(subscriber: Subscriber<T>, source: any): TeardownLogic { 91 return source.subscribe(new DelayWhenSubscriber(subscriber, this.delayDurationSelector)); 92 } 93} 94 95/** 96 * We need this JSDoc comment for affecting ESDoc. 97 * @ignore 98 * @extends {Ignored} 99 */ 100class DelayWhenSubscriber<T, R> extends OuterSubscriber<T, R> { 101 private completed: boolean = false; 102 private delayNotifierSubscriptions: Array<Subscription> = []; 103 private index: number = 0; 104 105 constructor(destination: Subscriber<T>, 106 private delayDurationSelector: (value: T, index: number) => Observable<any>) { 107 super(destination); 108 } 109 110 notifyNext(outerValue: T, innerValue: any, 111 outerIndex: number, innerIndex: number, 112 innerSub: InnerSubscriber<T, R>): void { 113 this.destination.next(outerValue); 114 this.removeSubscription(innerSub); 115 this.tryComplete(); 116 } 117 118 notifyError(error: any, innerSub: InnerSubscriber<T, R>): void { 119 this._error(error); 120 } 121 122 notifyComplete(innerSub: InnerSubscriber<T, R>): void { 123 const value = this.removeSubscription(innerSub); 124 if (value) { 125 this.destination.next(value); 126 } 127 this.tryComplete(); 128 } 129 130 protected _next(value: T): void { 131 const index = this.index++; 132 try { 133 const delayNotifier = this.delayDurationSelector(value, index); 134 if (delayNotifier) { 135 this.tryDelay(delayNotifier, value); 136 } 137 } catch (err) { 138 this.destination.error(err); 139 } 140 } 141 142 protected _complete(): void { 143 this.completed = true; 144 this.tryComplete(); 145 this.unsubscribe(); 146 } 147 148 private removeSubscription(subscription: InnerSubscriber<T, R>): T { 149 subscription.unsubscribe(); 150 151 const subscriptionIdx = this.delayNotifierSubscriptions.indexOf(subscription); 152 if (subscriptionIdx !== -1) { 153 this.delayNotifierSubscriptions.splice(subscriptionIdx, 1); 154 } 155 156 return subscription.outerValue; 157 } 158 159 private tryDelay(delayNotifier: Observable<any>, value: T): void { 160 const notifierSubscription = subscribeToResult(this, delayNotifier, value); 161 162 if (notifierSubscription && !notifierSubscription.closed) { 163 const destination = this.destination as Subscription; 164 destination.add(notifierSubscription); 165 this.delayNotifierSubscriptions.push(notifierSubscription); 166 } 167 } 168 169 private tryComplete(): void { 170 if (this.completed && this.delayNotifierSubscriptions.length === 0) { 171 this.destination.complete(); 172 } 173 } 174} 175 176/** 177 * We need this JSDoc comment for affecting ESDoc. 178 * @ignore 179 * @extends {Ignored} 180 */ 181class SubscriptionDelayObservable<T> extends Observable<T> { 182 constructor(public source: Observable<T>, private subscriptionDelay: Observable<any>) { 183 super(); 184 } 185 186 /** @deprecated This is an internal implementation detail, do not use. */ 187 _subscribe(subscriber: Subscriber<T>) { 188 this.subscriptionDelay.subscribe(new SubscriptionDelaySubscriber(subscriber, this.source)); 189 } 190} 191 192/** 193 * We need this JSDoc comment for affecting ESDoc. 194 * @ignore 195 * @extends {Ignored} 196 */ 197class SubscriptionDelaySubscriber<T> extends Subscriber<T> { 198 private sourceSubscribed: boolean = false; 199 200 constructor(private parent: Subscriber<T>, private source: Observable<T>) { 201 super(); 202 } 203 204 protected _next(unused: any) { 205 this.subscribeToSource(); 206 } 207 208 protected _error(err: any) { 209 this.unsubscribe(); 210 this.parent.error(err); 211 } 212 213 protected _complete() { 214 this.unsubscribe(); 215 this.subscribeToSource(); 216 } 217 218 private subscribeToSource(): void { 219 if (!this.sourceSubscribed) { 220 this.sourceSubscribed = true; 221 this.unsubscribe(); 222 this.source.subscribe(this.parent); 223 } 224 } 225} 226