1import { Observable } from '../Observable';
2import { async } from '../scheduler/async';
3import { SchedulerAction, SchedulerLike } from '../types';
4import { isNumeric } from '../util/isNumeric';
5import { Subscriber } from '../Subscriber';
6
7/**
8 * Creates an Observable that emits sequential numbers every specified
9 * interval of time, on a specified {@link SchedulerLike}.
10 *
11 * <span class="informal">Emits incremental numbers periodically in time.
12 * </span>
13 *
14 * ![](interval.png)
15 *
16 * `interval` returns an Observable that emits an infinite sequence of
17 * ascending integers, with a constant interval of time of your choosing
18 * between those emissions. The first emission is not sent immediately, but
19 * only after the first period has passed. By default, this operator uses the
20 * `async` {@link SchedulerLike} to provide a notion of time, but you may pass any
21 * {@link SchedulerLike} to it.
22 *
23 * ## Example
24 * Emits ascending numbers, one every second (1000ms) up to the number 3
25 * ```ts
26 * import { interval } from 'rxjs';
27 * import { take } from 'rxjs/operators';
28 *
29 * const numbers = interval(1000);
30 *
31 * const takeFourNumbers = numbers.pipe(take(4));
32 *
33 * takeFourNumbers.subscribe(x => console.log('Next: ', x));
34 *
35 * // Logs:
36 * // Next: 0
37 * // Next: 1
38 * // Next: 2
39 * // Next: 3
40 * ```
41 *
42 * @see {@link timer}
43 * @see {@link delay}
44 *
45 * @param {number} [period=0] The interval size in milliseconds (by default)
46 * or the time unit determined by the scheduler's clock.
47 * @param {SchedulerLike} [scheduler=async] The {@link SchedulerLike} to use for scheduling
48 * the emission of values, and providing a notion of "time".
49 * @return {Observable} An Observable that emits a sequential number each time
50 * interval.
51 * @static true
52 * @name interval
53 * @owner Observable
54 */
55export function interval(period = 0,
56                         scheduler: SchedulerLike = async): Observable<number> {
57  if (!isNumeric(period) || period < 0) {
58    period = 0;
59  }
60
61  if (!scheduler || typeof scheduler.schedule !== 'function') {
62    scheduler = async;
63  }
64
65  return new Observable<number>(subscriber => {
66    subscriber.add(
67      scheduler.schedule(dispatch, period, { subscriber, counter: 0, period })
68    );
69    return subscriber;
70  });
71}
72
73function dispatch(this: SchedulerAction<IntervalState>, state: IntervalState) {
74  const { subscriber, counter, period } = state;
75  subscriber.next(counter);
76  this.schedule({ subscriber, counter: counter + 1, period }, period);
77}
78
79interface IntervalState {
80  subscriber: Subscriber<number>;
81  counter: number;
82  period: number;
83}
84