1import { Operator } from '../Operator';
2import { Subscriber } from '../Subscriber';
3import { Observable } from '../Observable';
4import { Subject } from '../Subject';
5import { Subscription } from '../Subscription';
6import { OuterSubscriber } from '../OuterSubscriber';
7import { InnerSubscriber } from '../InnerSubscriber';
8import { subscribeToResult } from '../util/subscribeToResult';
9import { OperatorFunction } from '../types';
10
11/**
12 * Branch out the source Observable values as a nested Observable starting from
13 * an emission from `openings` and ending when the output of `closingSelector`
14 * emits.
15 *
16 * <span class="informal">It's like {@link bufferToggle}, but emits a nested
17 * Observable instead of an array.</span>
18 *
19 * ![](windowToggle.png)
20 *
21 * Returns an Observable that emits windows of items it collects from the source
22 * Observable. The output Observable emits windows that contain those items
23 * emitted by the source Observable between the time when the `openings`
24 * Observable emits an item and when the Observable returned by
25 * `closingSelector` emits an item.
26 *
27 * ## Example
28 * Every other second, emit the click events from the next 500ms
29 * ```ts
30 * import { fromEvent, interval, EMPTY } from 'rxjs';
31 * import { windowToggle, mergeAll } from 'rxjs/operators';
32 *
33 * const clicks = fromEvent(document, 'click');
34 * const openings = interval(1000);
35 * const result = clicks.pipe(
36 *   windowToggle(openings, i => i % 2 ? interval(500) : EMPTY),
37 *   mergeAll()
38 * );
39 * result.subscribe(x => console.log(x));
40 * ```
41 *
42 * @see {@link window}
43 * @see {@link windowCount}
44 * @see {@link windowTime}
45 * @see {@link windowWhen}
46 * @see {@link bufferToggle}
47 *
48 * @param {Observable<O>} openings An observable of notifications to start new
49 * windows.
50 * @param {function(value: O): Observable} closingSelector A function that takes
51 * the value emitted by the `openings` observable and returns an Observable,
52 * which, when it emits (either `next` or `complete`), signals that the
53 * associated window should complete.
54 * @return {Observable<Observable<T>>} An observable of windows, which in turn
55 * are Observables.
56 * @method windowToggle
57 * @owner Observable
58 */
59export function windowToggle<T, O>(openings: Observable<O>,
60                                   closingSelector: (openValue: O) => Observable<any>): OperatorFunction<T, Observable<T>> {
61  return (source: Observable<T>) => source.lift(new WindowToggleOperator<T, O>(openings, closingSelector));
62}
63
64class WindowToggleOperator<T, O> implements Operator<T, Observable<T>> {
65
66  constructor(private openings: Observable<O>,
67              private closingSelector: (openValue: O) => Observable<any>) {
68  }
69
70  call(subscriber: Subscriber<Observable<T>>, source: any): any {
71    return source.subscribe(new WindowToggleSubscriber(
72      subscriber, this.openings, this.closingSelector
73    ));
74  }
75}
76
77interface WindowContext<T> {
78  window: Subject<T>;
79  subscription: Subscription;
80}
81
82/**
83 * We need this JSDoc comment for affecting ESDoc.
84 * @ignore
85 * @extends {Ignored}
86 */
87class WindowToggleSubscriber<T, O> extends OuterSubscriber<T, any> {
88  private contexts: WindowContext<T>[] = [];
89  private openSubscription: Subscription;
90
91  constructor(destination: Subscriber<Observable<T>>,
92              private openings: Observable<O>,
93              private closingSelector: (openValue: O) => Observable<any>) {
94    super(destination);
95    this.add(this.openSubscription = subscribeToResult(this, openings, openings as any));
96  }
97
98  protected _next(value: T) {
99    const { contexts } = this;
100    if (contexts) {
101      const len = contexts.length;
102      for (let i = 0; i < len; i++) {
103        contexts[i].window.next(value);
104      }
105    }
106  }
107
108  protected _error(err: any) {
109
110    const { contexts } = this;
111    this.contexts = null;
112
113    if (contexts) {
114      const len = contexts.length;
115      let index = -1;
116
117      while (++index < len) {
118        const context = contexts[index];
119        context.window.error(err);
120        context.subscription.unsubscribe();
121      }
122    }
123
124    super._error(err);
125  }
126
127  protected _complete() {
128    const { contexts } = this;
129    this.contexts = null;
130    if (contexts) {
131      const len = contexts.length;
132      let index = -1;
133      while (++index < len) {
134        const context = contexts[index];
135        context.window.complete();
136        context.subscription.unsubscribe();
137      }
138    }
139    super._complete();
140  }
141
142  /** @deprecated This is an internal implementation detail, do not use. */
143  _unsubscribe() {
144    const { contexts } = this;
145    this.contexts = null;
146    if (contexts) {
147      const len = contexts.length;
148      let index = -1;
149      while (++index < len) {
150        const context = contexts[index];
151        context.window.unsubscribe();
152        context.subscription.unsubscribe();
153      }
154    }
155  }
156
157  notifyNext(outerValue: any, innerValue: any,
158             outerIndex: number, innerIndex: number,
159             innerSub: InnerSubscriber<T, any>): void {
160
161    if (outerValue === this.openings) {
162      let closingNotifier;
163      try {
164        const { closingSelector } = this;
165        closingNotifier = closingSelector(innerValue);
166      } catch (e) {
167        return this.error(e);
168      }
169
170      const window = new Subject<T>();
171      const subscription = new Subscription();
172      const context = { window, subscription };
173      this.contexts.push(context);
174      const innerSubscription = subscribeToResult(this, closingNotifier, context as any);
175
176      if (innerSubscription.closed) {
177        this.closeWindow(this.contexts.length - 1);
178      } else {
179        (<any>innerSubscription).context = context;
180        subscription.add(innerSubscription);
181      }
182
183      this.destination.next(window);
184    } else {
185      this.closeWindow(this.contexts.indexOf(outerValue));
186    }
187  }
188
189  notifyError(err: any): void {
190    this.error(err);
191  }
192
193  notifyComplete(inner: Subscription): void {
194    if (inner !== this.openSubscription) {
195      this.closeWindow(this.contexts.indexOf((<any> inner).context));
196    }
197  }
198
199  private closeWindow(index: number): void {
200    if (index === -1) {
201      return;
202    }
203
204    const { contexts } = this;
205    const context = contexts[index];
206    const { window, subscription } = context;
207    contexts.splice(index, 1);
208    window.complete();
209    subscription.unsubscribe();
210  }
211}
212