1import { Subject } from './Subject'; 2import { SchedulerLike } from './types'; 3import { queue } from './scheduler/queue'; 4import { Subscriber } from './Subscriber'; 5import { Subscription } from './Subscription'; 6import { ObserveOnSubscriber } from './operators/observeOn'; 7import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError'; 8import { SubjectSubscription } from './SubjectSubscription'; 9/** 10 * A variant of Subject that "replays" or emits old values to new subscribers. 11 * It buffers a set number of values and will emit those values immediately to 12 * any new subscribers in addition to emitting new values to existing subscribers. 13 * 14 * @class ReplaySubject<T> 15 */ 16export class ReplaySubject<T> extends Subject<T> { 17 private _events: (ReplayEvent<T> | T)[] = []; 18 private _bufferSize: number; 19 private _windowTime: number; 20 private _infiniteTimeWindow: boolean = false; 21 22 constructor(bufferSize: number = Number.POSITIVE_INFINITY, 23 windowTime: number = Number.POSITIVE_INFINITY, 24 private scheduler?: SchedulerLike) { 25 super(); 26 this._bufferSize = bufferSize < 1 ? 1 : bufferSize; 27 this._windowTime = windowTime < 1 ? 1 : windowTime; 28 29 if (windowTime === Number.POSITIVE_INFINITY) { 30 this._infiniteTimeWindow = true; 31 this.next = this.nextInfiniteTimeWindow; 32 } else { 33 this.next = this.nextTimeWindow; 34 } 35 } 36 37 private nextInfiniteTimeWindow(value: T): void { 38 const _events = this._events; 39 _events.push(value); 40 // Since this method is invoked in every next() call than the buffer 41 // can overgrow the max size only by one item 42 if (_events.length > this._bufferSize) { 43 _events.shift(); 44 } 45 46 super.next(value); 47 } 48 49 private nextTimeWindow(value: T): void { 50 this._events.push(new ReplayEvent(this._getNow(), value)); 51 this._trimBufferThenGetEvents(); 52 53 super.next(value); 54 } 55 56 /** @deprecated This is an internal implementation detail, do not use. */ 57 _subscribe(subscriber: Subscriber<T>): Subscription { 58 // When `_infiniteTimeWindow === true` then the buffer is already trimmed 59 const _infiniteTimeWindow = this._infiniteTimeWindow; 60 const _events = _infiniteTimeWindow ? this._events : this._trimBufferThenGetEvents(); 61 const scheduler = this.scheduler; 62 const len = _events.length; 63 let subscription: Subscription; 64 65 if (this.closed) { 66 throw new ObjectUnsubscribedError(); 67 } else if (this.isStopped || this.hasError) { 68 subscription = Subscription.EMPTY; 69 } else { 70 this.observers.push(subscriber); 71 subscription = new SubjectSubscription(this, subscriber); 72 } 73 74 if (scheduler) { 75 subscriber.add(subscriber = new ObserveOnSubscriber<T>(subscriber, scheduler)); 76 } 77 78 if (_infiniteTimeWindow) { 79 for (let i = 0; i < len && !subscriber.closed; i++) { 80 subscriber.next(<T>_events[i]); 81 } 82 } else { 83 for (let i = 0; i < len && !subscriber.closed; i++) { 84 subscriber.next((<ReplayEvent<T>>_events[i]).value); 85 } 86 } 87 88 if (this.hasError) { 89 subscriber.error(this.thrownError); 90 } else if (this.isStopped) { 91 subscriber.complete(); 92 } 93 94 return subscription; 95 } 96 97 _getNow(): number { 98 return (this.scheduler || queue).now(); 99 } 100 101 private _trimBufferThenGetEvents(): ReplayEvent<T>[] { 102 const now = this._getNow(); 103 const _bufferSize = this._bufferSize; 104 const _windowTime = this._windowTime; 105 const _events = <ReplayEvent<T>[]>this._events; 106 107 const eventsCount = _events.length; 108 let spliceCount = 0; 109 110 // Trim events that fall out of the time window. 111 // Start at the front of the list. Break early once 112 // we encounter an event that falls within the window. 113 while (spliceCount < eventsCount) { 114 if ((now - _events[spliceCount].time) < _windowTime) { 115 break; 116 } 117 spliceCount++; 118 } 119 120 if (eventsCount > _bufferSize) { 121 spliceCount = Math.max(spliceCount, eventsCount - _bufferSize); 122 } 123 124 if (spliceCount > 0) { 125 _events.splice(0, spliceCount); 126 } 127 128 return _events; 129 } 130 131} 132 133class ReplayEvent<T> { 134 constructor(public time: number, public value: T) { 135 } 136} 137