1from typing import Callable, NamedTuple, Any, Optional
2from datetime import timedelta
3
4
5from rx import operators as ops
6from rx.core import Observable, typing
7from rx.scheduler import TimeoutScheduler
8
9
10class TimeInterval(NamedTuple):
11    value: Any
12    interval: timedelta
13
14
15def _time_interval(scheduler: Optional[typing.Scheduler] = None) -> Callable[[Observable], Observable]:
16    def time_interval(source: Observable) -> Observable:
17        """Records the time interval between consecutive values in an
18        observable sequence.
19
20            >>> res = time_interval(source)
21
22        Return:
23            An observable sequence with time interval information on
24            values.
25        """
26
27        def subscribe(observer, scheduler_):
28            _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()
29            last = _scheduler.now
30
31            def mapper(value):
32                nonlocal last
33
34                now = _scheduler.now
35                span = now - last
36                last = now
37                return TimeInterval(value=value, interval=span)
38
39            return source.pipe(ops.map(mapper)).subscribe(observer, scheduler_)
40        return Observable(subscribe)
41    return time_interval
42