1from abc import abstractmethod
2from datetime import datetime
3from typing import Optional
4
5from rx.core import typing
6from rx.disposable import Disposable, MultipleAssignmentDisposable
7
8from .scheduler import Scheduler
9
10
11class PeriodicScheduler(Scheduler, typing.PeriodicScheduler):
12    """Base class for the various periodic scheduler implementations in this
13    package as well as the mainloop sub-package.
14    """
15
16    def schedule_periodic(self,
17                          period: typing.RelativeTime,
18                          action: typing.ScheduledPeriodicAction,
19                          state: Optional[typing.TState] = None
20                          ) -> typing.Disposable:
21        """Schedules a periodic piece of work.
22
23        Args:
24            period: Period in seconds or timedelta for running the
25                work periodically.
26            action: Action to be executed.
27            state: [Optional] Initial state passed to the action upon
28                the first iteration.
29
30        Returns:
31            The disposable object used to cancel the scheduled
32            recurring action (best effort).
33        """
34
35        disp: MultipleAssignmentDisposable = MultipleAssignmentDisposable()
36        seconds: float = self.to_seconds(period)
37
38        def periodic(scheduler: typing.Scheduler,
39                     state: Optional[typing.TState] = None
40                     ) -> Optional[Disposable]:
41            if disp.is_disposed:
42                return None
43
44            now: datetime = scheduler.now
45
46            try:
47                state = action(state)
48            except Exception:
49                disp.dispose()
50                raise
51
52            time = seconds - (scheduler.now - now).total_seconds()
53            disp.disposable = scheduler.schedule_relative(time, periodic, state=state)
54
55            return None
56
57        disp.disposable = self.schedule_relative(period, periodic, state=state)
58        return disp
59
60    @abstractmethod
61    def schedule(self,
62                 action: typing.ScheduledAction,
63                 state: Optional[typing.TState] = None
64                 ) -> typing.Disposable:
65        """Schedules an action to be executed.
66
67        Args:
68            action: Action to be executed.
69            state: [Optional] state to be given to the action function.
70
71        Returns:
72            The disposable object used to cancel the scheduled action
73            (best effort).
74        """
75
76        return NotImplemented
77
78    @abstractmethod
79    def schedule_relative(self,
80                          duetime: typing.RelativeTime,
81                          action: typing.ScheduledAction,
82                          state: Optional[typing.TState] = None
83                          ) -> typing.Disposable:
84        """Schedules an action to be executed after duetime.
85
86        Args:
87            duetime: Relative time after which to execute the action.
88            action: Action to be executed.
89            state: [Optional] state to be given to the action function.
90
91        Returns:
92            The disposable object used to cancel the scheduled action
93            (best effort).
94        """
95
96        return NotImplemented
97
98    @abstractmethod
99    def schedule_absolute(self,
100                          duetime: typing.AbsoluteTime,
101                          action: typing.ScheduledAction,
102                          state: Optional[typing.TState] = None
103                          ) -> typing.Disposable:
104        """Schedules an action to be executed at duetime.
105
106        Args:
107            duetime: Absolute time at which to execute the action.
108            action: Action to be executed.
109            state: [Optional] state to be given to the action function.
110
111        Returns:
112            The disposable object used to cancel the scheduled action
113            (best effort).
114        """
115
116        return NotImplemented
117