1import { Operator } from '../Operator'; 2import { Observable } from '../Observable'; 3import { Subscriber } from '../Subscriber'; 4import { Subscription } from '../Subscription'; 5 6import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from '../types'; 7import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; 8 9export interface ThrottleConfig { 10 leading?: boolean; 11 trailing?: boolean; 12} 13 14export const defaultThrottleConfig: ThrottleConfig = { 15 leading: true, 16 trailing: false 17}; 18 19/** 20 * Emits a value from the source Observable, then ignores subsequent source 21 * values for a duration determined by another Observable, then repeats this 22 * process. 23 * 24 * <span class="informal">It's like {@link throttleTime}, but the silencing 25 * duration is determined by a second Observable.</span> 26 * 27 * ![](throttle.png) 28 * 29 * `throttle` emits the source Observable values on the output Observable 30 * when its internal timer is disabled, and ignores source values when the timer 31 * is enabled. Initially, the timer is disabled. As soon as the first source 32 * value arrives, it is forwarded to the output Observable, and then the timer 33 * is enabled by calling the `durationSelector` function with the source value, 34 * which returns the "duration" Observable. When the duration Observable emits a 35 * value or completes, the timer is disabled, and this process repeats for the 36 * next source value. 37 * 38 * ## Example 39 * Emit clicks at a rate of at most one click per second 40 * ```ts 41 * import { fromEvent } from 'rxjs'; 42 * import { throttle } from 'rxjs/operators'; 43 * 44 * const clicks = fromEvent(document, 'click'); 45 * const result = clicks.pipe(throttle(ev => interval(1000))); 46 * result.subscribe(x => console.log(x)); 47 * ``` 48 * 49 * @see {@link audit} 50 * @see {@link debounce} 51 * @see {@link delayWhen} 52 * @see {@link sample} 53 * @see {@link throttleTime} 54 * 55 * @param {function(value: T): SubscribableOrPromise} durationSelector A function 56 * that receives a value from the source Observable, for computing the silencing 57 * duration for each source value, returned as an Observable or a Promise. 58 * @param {Object} config a configuration object to define `leading` and `trailing` behavior. Defaults 59 * to `{ leading: true, trailing: false }`. 60 * @return {Observable<T>} An Observable that performs the throttle operation to 61 * limit the rate of emissions from the source. 62 * @method throttle 63 * @owner Observable 64 */ 65export function throttle<T>(durationSelector: (value: T) => SubscribableOrPromise<any>, 66 config: ThrottleConfig = defaultThrottleConfig): MonoTypeOperatorFunction<T> { 67 return (source: Observable<T>) => source.lift(new ThrottleOperator(durationSelector, !!config.leading, !!config.trailing)); 68} 69 70class ThrottleOperator<T> implements Operator<T, T> { 71 constructor(private durationSelector: (value: T) => SubscribableOrPromise<any>, 72 private leading: boolean, 73 private trailing: boolean) { 74 } 75 76 call(subscriber: Subscriber<T>, source: any): TeardownLogic { 77 return source.subscribe( 78 new ThrottleSubscriber(subscriber, this.durationSelector, this.leading, this.trailing) 79 ); 80 } 81} 82 83/** 84 * We need this JSDoc comment for affecting ESDoc 85 * @ignore 86 * @extends {Ignored} 87 */ 88class ThrottleSubscriber<T, R> extends SimpleOuterSubscriber<T, R> { 89 private _throttled?: Subscription; 90 private _sendValue?: T; 91 private _hasValue = false; 92 93 constructor(protected destination: Subscriber<T>, 94 private durationSelector: (value: T) => SubscribableOrPromise<number>, 95 private _leading: boolean, 96 private _trailing: boolean) { 97 super(destination); 98 } 99 100 protected _next(value: T): void { 101 this._hasValue = true; 102 this._sendValue = value; 103 104 if (!this._throttled) { 105 if (this._leading) { 106 this.send(); 107 } else { 108 this.throttle(value); 109 } 110 } 111 } 112 113 private send() { 114 const { _hasValue, _sendValue } = this; 115 if (_hasValue) { 116 this.destination.next(_sendValue); 117 this.throttle(_sendValue!); 118 } 119 this._hasValue = false; 120 this._sendValue = undefined; 121 } 122 123 private throttle(value: T): void { 124 const duration = this.tryDurationSelector(value); 125 if (!!duration) { 126 this.add(this._throttled = innerSubscribe(duration, new SimpleInnerSubscriber(this))); 127 } 128 } 129 130 private tryDurationSelector(value: T): SubscribableOrPromise<any> | null { 131 try { 132 return this.durationSelector(value); 133 } catch (err) { 134 this.destination.error(err); 135 return null; 136 } 137 } 138 139 private throttlingDone() { 140 const { _throttled, _trailing } = this; 141 if (_throttled) { 142 _throttled.unsubscribe(); 143 } 144 this._throttled = undefined; 145 146 if (_trailing) { 147 this.send(); 148 } 149 } 150 151 notifyNext(): void { 152 this.throttlingDone(); 153 } 154 155 notifyComplete(): void { 156 this.throttlingDone(); 157 } 158} 159