1import { Operator } from '../Operator'; 2import { async } from '../scheduler/async'; 3import { Observable } from '../Observable'; 4import { Subscriber } from '../Subscriber'; 5import { Subscription } from '../Subscription'; 6import { isScheduler } from '../util/isScheduler'; 7import { OperatorFunction, SchedulerAction, SchedulerLike } from '../types'; 8 9/* tslint:disable:max-line-length */ 10export function bufferTime<T>(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>; 11export function bufferTime<T>(bufferTimeSpan: number, bufferCreationInterval: number | null | undefined, scheduler?: SchedulerLike): OperatorFunction<T, T[]>; 12export function bufferTime<T>(bufferTimeSpan: number, bufferCreationInterval: number | null | undefined, maxBufferSize: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>; 13/* tslint:enable:max-line-length */ 14 15/** 16 * Buffers the source Observable values for a specific time period. 17 * 18 * <span class="informal">Collects values from the past as an array, and emits 19 * those arrays periodically in time.</span> 20 * 21 * ![](bufferTime.png) 22 * 23 * Buffers values from the source for a specific time duration `bufferTimeSpan`. 24 * Unless the optional argument `bufferCreationInterval` is given, it emits and 25 * resets the buffer every `bufferTimeSpan` milliseconds. If 26 * `bufferCreationInterval` is given, this operator opens the buffer every 27 * `bufferCreationInterval` milliseconds and closes (emits and resets) the 28 * buffer every `bufferTimeSpan` milliseconds. When the optional argument 29 * `maxBufferSize` is specified, the buffer will be closed either after 30 * `bufferTimeSpan` milliseconds or when it contains `maxBufferSize` elements. 31 * 32 * ## Examples 33 * 34 * Every second, emit an array of the recent click events 35 * 36 * ```ts 37 * import { fromEvent } from 'rxjs'; 38 * import { bufferTime } from 'rxjs/operators'; 39 * 40 * const clicks = fromEvent(document, 'click'); 41 * const buffered = clicks.pipe(bufferTime(1000)); 42 * buffered.subscribe(x => console.log(x)); 43 * ``` 44 * 45 * Every 5 seconds, emit the click events from the next 2 seconds 46 * 47 * ```ts 48 * import { fromEvent } from 'rxjs'; 49 * import { bufferTime } from 'rxjs/operators'; 50 * 51 * const clicks = fromEvent(document, 'click'); 52 * const buffered = clicks.pipe(bufferTime(2000, 5000)); 53 * buffered.subscribe(x => console.log(x)); 54 * ``` 55 * 56 * @see {@link buffer} 57 * @see {@link bufferCount} 58 * @see {@link bufferToggle} 59 * @see {@link bufferWhen} 60 * @see {@link windowTime} 61 * 62 * @param {number} bufferTimeSpan The amount of time to fill each buffer array. 63 * @param {number} [bufferCreationInterval] The interval at which to start new 64 * buffers. 65 * @param {number} [maxBufferSize] The maximum buffer size. 66 * @param {SchedulerLike} [scheduler=async] The scheduler on which to schedule the 67 * intervals that determine buffer boundaries. 68 * @return {Observable<T[]>} An observable of arrays of buffered values. 69 * @method bufferTime 70 * @owner Observable 71 */ 72export function bufferTime<T>(bufferTimeSpan: number): OperatorFunction<T, T[]> { 73 let length: number = arguments.length; 74 75 let scheduler: SchedulerLike = async; 76 if (isScheduler(arguments[arguments.length - 1])) { 77 scheduler = arguments[arguments.length - 1]; 78 length--; 79 } 80 81 let bufferCreationInterval: number = null; 82 if (length >= 2) { 83 bufferCreationInterval = arguments[1]; 84 } 85 86 let maxBufferSize: number = Number.POSITIVE_INFINITY; 87 if (length >= 3) { 88 maxBufferSize = arguments[2]; 89 } 90 91 return function bufferTimeOperatorFunction(source: Observable<T>) { 92 return source.lift(new BufferTimeOperator<T>(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler)); 93 }; 94} 95 96class BufferTimeOperator<T> implements Operator<T, T[]> { 97 constructor(private bufferTimeSpan: number, 98 private bufferCreationInterval: number, 99 private maxBufferSize: number, 100 private scheduler: SchedulerLike) { 101 } 102 103 call(subscriber: Subscriber<T[]>, source: any): any { 104 return source.subscribe(new BufferTimeSubscriber( 105 subscriber, this.bufferTimeSpan, this.bufferCreationInterval, this.maxBufferSize, this.scheduler 106 )); 107 } 108} 109 110class Context<T> { 111 buffer: T[] = []; 112 closeAction: Subscription; 113} 114 115interface DispatchCreateArg<T> { 116 bufferTimeSpan: number; 117 bufferCreationInterval: number; 118 subscriber: BufferTimeSubscriber<T>; 119 scheduler: SchedulerLike; 120} 121 122interface DispatchCloseArg<T> { 123 subscriber: BufferTimeSubscriber<T>; 124 context: Context<T>; 125} 126 127/** 128 * We need this JSDoc comment for affecting ESDoc. 129 * @ignore 130 * @extends {Ignored} 131 */ 132class BufferTimeSubscriber<T> extends Subscriber<T> { 133 private contexts: Array<Context<T>> = []; 134 private timespanOnly: boolean; 135 136 constructor(destination: Subscriber<T[]>, 137 private bufferTimeSpan: number, 138 private bufferCreationInterval: number, 139 private maxBufferSize: number, 140 private scheduler: SchedulerLike) { 141 super(destination); 142 const context = this.openContext(); 143 this.timespanOnly = bufferCreationInterval == null || bufferCreationInterval < 0; 144 if (this.timespanOnly) { 145 const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan }; 146 this.add(context.closeAction = scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState)); 147 } else { 148 const closeState = { subscriber: this, context }; 149 const creationState: DispatchCreateArg<T> = { bufferTimeSpan, bufferCreationInterval, subscriber: this, scheduler }; 150 this.add(context.closeAction = scheduler.schedule<DispatchCloseArg<T>>(dispatchBufferClose, bufferTimeSpan, closeState)); 151 this.add(scheduler.schedule<DispatchCreateArg<T>>(dispatchBufferCreation, bufferCreationInterval, creationState)); 152 } 153 } 154 155 protected _next(value: T) { 156 const contexts = this.contexts; 157 const len = contexts.length; 158 let filledBufferContext: Context<T>; 159 for (let i = 0; i < len; i++) { 160 const context = contexts[i]; 161 const buffer = context.buffer; 162 buffer.push(value); 163 if (buffer.length == this.maxBufferSize) { 164 filledBufferContext = context; 165 } 166 } 167 168 if (filledBufferContext) { 169 this.onBufferFull(filledBufferContext); 170 } 171 } 172 173 protected _error(err: any) { 174 this.contexts.length = 0; 175 super._error(err); 176 } 177 178 protected _complete() { 179 const { contexts, destination } = this; 180 while (contexts.length > 0) { 181 const context = contexts.shift(); 182 destination.next(context.buffer); 183 } 184 super._complete(); 185 } 186 187 /** @deprecated This is an internal implementation detail, do not use. */ 188 _unsubscribe() { 189 this.contexts = null; 190 } 191 192 protected onBufferFull(context: Context<T>) { 193 this.closeContext(context); 194 const closeAction = context.closeAction; 195 closeAction.unsubscribe(); 196 this.remove(closeAction); 197 198 if (!this.closed && this.timespanOnly) { 199 context = this.openContext(); 200 const bufferTimeSpan = this.bufferTimeSpan; 201 const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan }; 202 this.add(context.closeAction = this.scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState)); 203 } 204 } 205 206 openContext(): Context<T> { 207 const context: Context<T> = new Context<T>(); 208 this.contexts.push(context); 209 return context; 210 } 211 212 closeContext(context: Context<T>) { 213 this.destination.next(context.buffer); 214 const contexts = this.contexts; 215 216 const spliceIndex = contexts ? contexts.indexOf(context) : -1; 217 if (spliceIndex >= 0) { 218 contexts.splice(contexts.indexOf(context), 1); 219 } 220 } 221} 222 223function dispatchBufferTimeSpanOnly(this: SchedulerAction<any>, state: any) { 224 const subscriber: BufferTimeSubscriber<any> = state.subscriber; 225 226 const prevContext = state.context; 227 if (prevContext) { 228 subscriber.closeContext(prevContext); 229 } 230 231 if (!subscriber.closed) { 232 state.context = subscriber.openContext(); 233 state.context.closeAction = this.schedule(state, state.bufferTimeSpan); 234 } 235} 236 237function dispatchBufferCreation<T>(this: SchedulerAction<DispatchCreateArg<T>>, state: DispatchCreateArg<T>) { 238 const { bufferCreationInterval, bufferTimeSpan, subscriber, scheduler } = state; 239 const context = subscriber.openContext(); 240 const action = <SchedulerAction<DispatchCreateArg<T>>>this; 241 if (!subscriber.closed) { 242 subscriber.add(context.closeAction = scheduler.schedule<DispatchCloseArg<T>>(dispatchBufferClose, bufferTimeSpan, { subscriber, context })); 243 action.schedule(state, bufferCreationInterval); 244 } 245} 246 247function dispatchBufferClose<T>(arg: DispatchCloseArg<T>) { 248 const { subscriber, context } = arg; 249 subscriber.closeContext(context); 250} 251