1from abc import abstractmethod 2from datetime import datetime 3from typing import Optional 4 5from rx.core import typing 6from rx.disposable import Disposable, MultipleAssignmentDisposable 7 8from .scheduler import Scheduler 9 10 11class PeriodicScheduler(Scheduler, typing.PeriodicScheduler): 12 """Base class for the various periodic scheduler implementations in this 13 package as well as the mainloop sub-package. 14 """ 15 16 def schedule_periodic(self, 17 period: typing.RelativeTime, 18 action: typing.ScheduledPeriodicAction, 19 state: Optional[typing.TState] = None 20 ) -> typing.Disposable: 21 """Schedules a periodic piece of work. 22 23 Args: 24 period: Period in seconds or timedelta for running the 25 work periodically. 26 action: Action to be executed. 27 state: [Optional] Initial state passed to the action upon 28 the first iteration. 29 30 Returns: 31 The disposable object used to cancel the scheduled 32 recurring action (best effort). 33 """ 34 35 disp: MultipleAssignmentDisposable = MultipleAssignmentDisposable() 36 seconds: float = self.to_seconds(period) 37 38 def periodic(scheduler: typing.Scheduler, 39 state: Optional[typing.TState] = None 40 ) -> Optional[Disposable]: 41 if disp.is_disposed: 42 return None 43 44 now: datetime = scheduler.now 45 46 try: 47 state = action(state) 48 except Exception: 49 disp.dispose() 50 raise 51 52 time = seconds - (scheduler.now - now).total_seconds() 53 disp.disposable = scheduler.schedule_relative(time, periodic, state=state) 54 55 return None 56 57 disp.disposable = self.schedule_relative(period, periodic, state=state) 58 return disp 59 60 @abstractmethod 61 def schedule(self, 62 action: typing.ScheduledAction, 63 state: Optional[typing.TState] = None 64 ) -> typing.Disposable: 65 """Schedules an action to be executed. 66 67 Args: 68 action: Action to be executed. 69 state: [Optional] state to be given to the action function. 70 71 Returns: 72 The disposable object used to cancel the scheduled action 73 (best effort). 74 """ 75 76 return NotImplemented 77 78 @abstractmethod 79 def schedule_relative(self, 80 duetime: typing.RelativeTime, 81 action: typing.ScheduledAction, 82 state: Optional[typing.TState] = None 83 ) -> typing.Disposable: 84 """Schedules an action to be executed after duetime. 85 86 Args: 87 duetime: Relative time after which to execute the action. 88 action: Action to be executed. 89 state: [Optional] state to be given to the action function. 90 91 Returns: 92 The disposable object used to cancel the scheduled action 93 (best effort). 94 """ 95 96 return NotImplemented 97 98 @abstractmethod 99 def schedule_absolute(self, 100 duetime: typing.AbsoluteTime, 101 action: typing.ScheduledAction, 102 state: Optional[typing.TState] = None 103 ) -> typing.Disposable: 104 """Schedules an action to be executed at duetime. 105 106 Args: 107 duetime: Absolute time at which to execute the action. 108 action: Action to be executed. 109 state: [Optional] state to be given to the action function. 110 111 Returns: 112 The disposable object used to cancel the scheduled action 113 (best effort). 114 """ 115 116 return NotImplemented 117