1from typing import Callable, NamedTuple, Any, Optional 2from datetime import timedelta 3 4 5from rx import operators as ops 6from rx.core import Observable, typing 7from rx.scheduler import TimeoutScheduler 8 9 10class TimeInterval(NamedTuple): 11 value: Any 12 interval: timedelta 13 14 15def _time_interval(scheduler: Optional[typing.Scheduler] = None) -> Callable[[Observable], Observable]: 16 def time_interval(source: Observable) -> Observable: 17 """Records the time interval between consecutive values in an 18 observable sequence. 19 20 >>> res = time_interval(source) 21 22 Return: 23 An observable sequence with time interval information on 24 values. 25 """ 26 27 def subscribe(observer, scheduler_): 28 _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton() 29 last = _scheduler.now 30 31 def mapper(value): 32 nonlocal last 33 34 now = _scheduler.now 35 span = now - last 36 last = now 37 return TimeInterval(value=value, interval=span) 38 39 return source.pipe(ops.map(mapper)).subscribe(observer, scheduler_) 40 return Observable(subscribe) 41 return time_interval 42