1import { Subject } from './Subject'; 2import { SchedulerLike } from './types'; 3import { queue } from './scheduler/queue'; 4import { Subscriber } from './Subscriber'; 5import { Subscription } from './Subscription'; 6import { ObserveOnSubscriber } from './operators/observeOn'; 7import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError'; 8import { SubjectSubscription } from './SubjectSubscription'; 9/** 10 * A variant of Subject that "replays" or emits old values to new subscribers. 11 * It buffers a set number of values and will emit those values immediately to 12 * any new subscribers in addition to emitting new values to existing subscribers. 13 * 14 * @class ReplaySubject<T> 15 */ 16export class ReplaySubject<T> extends Subject<T> { 17 private _events: (ReplayEvent<T> | T)[] = []; 18 private _bufferSize: number; 19 private _windowTime: number; 20 private _infiniteTimeWindow: boolean = false; 21 22 constructor(bufferSize: number = Number.POSITIVE_INFINITY, 23 windowTime: number = Number.POSITIVE_INFINITY, 24 private scheduler?: SchedulerLike) { 25 super(); 26 this._bufferSize = bufferSize < 1 ? 1 : bufferSize; 27 this._windowTime = windowTime < 1 ? 1 : windowTime; 28 29 if (windowTime === Number.POSITIVE_INFINITY) { 30 this._infiniteTimeWindow = true; 31 this.next = this.nextInfiniteTimeWindow; 32 } else { 33 this.next = this.nextTimeWindow; 34 } 35 } 36 37 private nextInfiniteTimeWindow(value: T): void { 38 if (!this.isStopped) { 39 const _events = this._events; 40 _events.push(value); 41 // Since this method is invoked in every next() call than the buffer 42 // can overgrow the max size only by one item 43 if (_events.length > this._bufferSize) { 44 _events.shift(); 45 } 46 } 47 super.next(value); 48 } 49 50 private nextTimeWindow(value: T): void { 51 if (!this.isStopped) { 52 this._events.push(new ReplayEvent(this._getNow(), value)); 53 this._trimBufferThenGetEvents(); 54 } 55 super.next(value); 56 } 57 58 /** @deprecated This is an internal implementation detail, do not use. */ 59 _subscribe(subscriber: Subscriber<T>): Subscription { 60 // When `_infiniteTimeWindow === true` then the buffer is already trimmed 61 const _infiniteTimeWindow = this._infiniteTimeWindow; 62 const _events = _infiniteTimeWindow ? this._events : this._trimBufferThenGetEvents(); 63 const scheduler = this.scheduler; 64 const len = _events.length; 65 let subscription: Subscription; 66 67 if (this.closed) { 68 throw new ObjectUnsubscribedError(); 69 } else if (this.isStopped || this.hasError) { 70 subscription = Subscription.EMPTY; 71 } else { 72 this.observers.push(subscriber); 73 subscription = new SubjectSubscription(this, subscriber); 74 } 75 76 if (scheduler) { 77 subscriber.add(subscriber = new ObserveOnSubscriber<T>(subscriber, scheduler)); 78 } 79 80 if (_infiniteTimeWindow) { 81 for (let i = 0; i < len && !subscriber.closed; i++) { 82 subscriber.next(<T>_events[i]); 83 } 84 } else { 85 for (let i = 0; i < len && !subscriber.closed; i++) { 86 subscriber.next((<ReplayEvent<T>>_events[i]).value); 87 } 88 } 89 90 if (this.hasError) { 91 subscriber.error(this.thrownError); 92 } else if (this.isStopped) { 93 subscriber.complete(); 94 } 95 96 return subscription; 97 } 98 99 _getNow(): number { 100 return (this.scheduler || queue).now(); 101 } 102 103 private _trimBufferThenGetEvents(): ReplayEvent<T>[] { 104 const now = this._getNow(); 105 const _bufferSize = this._bufferSize; 106 const _windowTime = this._windowTime; 107 const _events = <ReplayEvent<T>[]>this._events; 108 109 const eventsCount = _events.length; 110 let spliceCount = 0; 111 112 // Trim events that fall out of the time window. 113 // Start at the front of the list. Break early once 114 // we encounter an event that falls within the window. 115 while (spliceCount < eventsCount) { 116 if ((now - _events[spliceCount].time) < _windowTime) { 117 break; 118 } 119 spliceCount++; 120 } 121 122 if (eventsCount > _bufferSize) { 123 spliceCount = Math.max(spliceCount, eventsCount - _bufferSize); 124 } 125 126 if (spliceCount > 0) { 127 _events.splice(0, spliceCount); 128 } 129 130 return _events; 131 } 132 133} 134 135class ReplayEvent<T> { 136 constructor(public time: number, public value: T) { 137 } 138} 139