1import { Operator } from '../Operator'; 2import { Subscriber } from '../Subscriber'; 3import { Observable } from '../Observable'; 4import { Subject } from '../Subject'; 5import { Subscription } from '../Subscription'; 6import { OuterSubscriber } from '../OuterSubscriber'; 7import { InnerSubscriber } from '../InnerSubscriber'; 8import { subscribeToResult } from '../util/subscribeToResult'; 9import { OperatorFunction } from '../types'; 10 11/** 12 * Branch out the source Observable values as a nested Observable starting from 13 * an emission from `openings` and ending when the output of `closingSelector` 14 * emits. 15 * 16 * <span class="informal">It's like {@link bufferToggle}, but emits a nested 17 * Observable instead of an array.</span> 18 * 19 * ![](windowToggle.png) 20 * 21 * Returns an Observable that emits windows of items it collects from the source 22 * Observable. The output Observable emits windows that contain those items 23 * emitted by the source Observable between the time when the `openings` 24 * Observable emits an item and when the Observable returned by 25 * `closingSelector` emits an item. 26 * 27 * ## Example 28 * Every other second, emit the click events from the next 500ms 29 * ```ts 30 * import { fromEvent, interval, EMPTY } from 'rxjs'; 31 * import { windowToggle, mergeAll } from 'rxjs/operators'; 32 * 33 * const clicks = fromEvent(document, 'click'); 34 * const openings = interval(1000); 35 * const result = clicks.pipe( 36 * windowToggle(openings, i => i % 2 ? interval(500) : EMPTY), 37 * mergeAll() 38 * ); 39 * result.subscribe(x => console.log(x)); 40 * ``` 41 * 42 * @see {@link window} 43 * @see {@link windowCount} 44 * @see {@link windowTime} 45 * @see {@link windowWhen} 46 * @see {@link bufferToggle} 47 * 48 * @param {Observable<O>} openings An observable of notifications to start new 49 * windows. 50 * @param {function(value: O): Observable} closingSelector A function that takes 51 * the value emitted by the `openings` observable and returns an Observable, 52 * which, when it emits (either `next` or `complete`), signals that the 53 * associated window should complete. 54 * @return {Observable<Observable<T>>} An observable of windows, which in turn 55 * are Observables. 56 * @method windowToggle 57 * @owner Observable 58 */ 59export function windowToggle<T, O>(openings: Observable<O>, 60 closingSelector: (openValue: O) => Observable<any>): OperatorFunction<T, Observable<T>> { 61 return (source: Observable<T>) => source.lift(new WindowToggleOperator<T, O>(openings, closingSelector)); 62} 63 64class WindowToggleOperator<T, O> implements Operator<T, Observable<T>> { 65 66 constructor(private openings: Observable<O>, 67 private closingSelector: (openValue: O) => Observable<any>) { 68 } 69 70 call(subscriber: Subscriber<Observable<T>>, source: any): any { 71 return source.subscribe(new WindowToggleSubscriber( 72 subscriber, this.openings, this.closingSelector 73 )); 74 } 75} 76 77interface WindowContext<T> { 78 window: Subject<T>; 79 subscription: Subscription; 80} 81 82/** 83 * We need this JSDoc comment for affecting ESDoc. 84 * @ignore 85 * @extends {Ignored} 86 */ 87class WindowToggleSubscriber<T, O> extends OuterSubscriber<T, any> { 88 private contexts: WindowContext<T>[] = []; 89 private openSubscription: Subscription; 90 91 constructor(destination: Subscriber<Observable<T>>, 92 private openings: Observable<O>, 93 private closingSelector: (openValue: O) => Observable<any>) { 94 super(destination); 95 this.add(this.openSubscription = subscribeToResult(this, openings, openings as any)); 96 } 97 98 protected _next(value: T) { 99 const { contexts } = this; 100 if (contexts) { 101 const len = contexts.length; 102 for (let i = 0; i < len; i++) { 103 contexts[i].window.next(value); 104 } 105 } 106 } 107 108 protected _error(err: any) { 109 110 const { contexts } = this; 111 this.contexts = null; 112 113 if (contexts) { 114 const len = contexts.length; 115 let index = -1; 116 117 while (++index < len) { 118 const context = contexts[index]; 119 context.window.error(err); 120 context.subscription.unsubscribe(); 121 } 122 } 123 124 super._error(err); 125 } 126 127 protected _complete() { 128 const { contexts } = this; 129 this.contexts = null; 130 if (contexts) { 131 const len = contexts.length; 132 let index = -1; 133 while (++index < len) { 134 const context = contexts[index]; 135 context.window.complete(); 136 context.subscription.unsubscribe(); 137 } 138 } 139 super._complete(); 140 } 141 142 /** @deprecated This is an internal implementation detail, do not use. */ 143 _unsubscribe() { 144 const { contexts } = this; 145 this.contexts = null; 146 if (contexts) { 147 const len = contexts.length; 148 let index = -1; 149 while (++index < len) { 150 const context = contexts[index]; 151 context.window.unsubscribe(); 152 context.subscription.unsubscribe(); 153 } 154 } 155 } 156 157 notifyNext(outerValue: any, innerValue: any, 158 outerIndex: number, innerIndex: number, 159 innerSub: InnerSubscriber<T, any>): void { 160 161 if (outerValue === this.openings) { 162 let closingNotifier; 163 try { 164 const { closingSelector } = this; 165 closingNotifier = closingSelector(innerValue); 166 } catch (e) { 167 return this.error(e); 168 } 169 170 const window = new Subject<T>(); 171 const subscription = new Subscription(); 172 const context = { window, subscription }; 173 this.contexts.push(context); 174 const innerSubscription = subscribeToResult(this, closingNotifier, context as any); 175 176 if (innerSubscription.closed) { 177 this.closeWindow(this.contexts.length - 1); 178 } else { 179 (<any>innerSubscription).context = context; 180 subscription.add(innerSubscription); 181 } 182 183 this.destination.next(window); 184 } else { 185 this.closeWindow(this.contexts.indexOf(outerValue)); 186 } 187 } 188 189 notifyError(err: any): void { 190 this.error(err); 191 } 192 193 notifyComplete(inner: Subscription): void { 194 if (inner !== this.openSubscription) { 195 this.closeWindow(this.contexts.indexOf((<any> inner).context)); 196 } 197 } 198 199 private closeWindow(index: number): void { 200 if (index === -1) { 201 return; 202 } 203 204 const { contexts } = this; 205 const context = contexts[index]; 206 const { window, subscription } = context; 207 contexts.splice(index, 1); 208 window.complete(); 209 subscription.unsubscribe(); 210 } 211} 212