1import logging
2
3from typing import cast, Any, Optional, Set
4
5from rx.core import typing
6from rx.disposable import CompositeDisposable, Disposable, SingleAssignmentDisposable
7
8from ..periodicscheduler import PeriodicScheduler
9
10
11log = logging.getLogger("Rx")
12
13
14class WxScheduler(PeriodicScheduler):
15    """A scheduler for a wxPython event loop."""
16
17    def __init__(self, wx: Any) -> None:
18        """Create a new WxScheduler.
19
20        Args:
21            wx: The wx module to use; typically, you would get this by
22                import wx
23        """
24
25        super().__init__()
26        self._wx = wx
27        timer_class: Any = self._wx.Timer
28
29        class Timer(timer_class):
30
31            def __init__(self, callback) -> None:
32                super().__init__()
33                self.callback = callback
34
35            def Notify(self):
36                self.callback()
37
38        self._timer_class = Timer
39        self._timers: Set[Timer] = set()
40
41    def cancel_all(self) -> None:
42        """Cancel all scheduled actions.
43
44        Should be called when destroying wx controls to prevent
45        accessing dead wx objects in actions that might be pending.
46        """
47        for timer in self._timers:
48            timer.Stop()
49
50    def _wxtimer_schedule(self,
51                          time: typing.AbsoluteOrRelativeTime,
52                          action: typing.ScheduledSingleOrPeriodicAction,
53                          state: Optional[typing.TState] = None,
54                          periodic: bool = False
55                          ) -> typing.Disposable:
56        scheduler = self
57
58        sad = SingleAssignmentDisposable()
59
60        def interval() -> None:
61            nonlocal state
62            if periodic:
63                state = cast(typing.ScheduledPeriodicAction, action)(state)
64            else:
65                sad.disposable = cast(typing.ScheduledAction, action)(scheduler, state)
66
67        msecs = max(1, int(self.to_seconds(time) * 1000.0))  # Must be non-zero
68
69        log.debug("timeout wx: %s", msecs)
70
71        timer = self._timer_class(interval)
72        timer.Start(
73            msecs,
74            self._wx.TIMER_CONTINUOUS if periodic else self._wx.TIMER_ONE_SHOT
75        )
76        self._timers.add(timer)
77
78        def dispose() -> None:
79            timer.Stop()
80            self._timers.remove(timer)
81
82        return CompositeDisposable(sad, Disposable(dispose))
83
84    def schedule(self,
85                 action: typing.ScheduledAction,
86                 state: Optional[typing.TState] = None
87                 ) -> typing.Disposable:
88        """Schedules an action to be executed.
89
90        Args:
91            action: Action to be executed.
92            state: [Optional] state to be given to the action function.
93
94        Returns:
95            The disposable object used to cancel the scheduled action
96            (best effort).
97        """
98
99        return self._wxtimer_schedule(0.0, action, state=state)
100
101    def schedule_relative(self,
102                          duetime: typing.RelativeTime,
103                          action: typing.ScheduledAction,
104                          state: Optional[typing.TState] = None
105                          ) -> typing.Disposable:
106        """Schedules an action to be executed after duetime.
107
108        Args:
109            duetime: Relative time after which to execute the action.
110            action: Action to be executed.
111            state: [Optional] state to be given to the action function.
112
113        Returns:
114            The disposable object used to cancel the scheduled action
115            (best effort).
116        """
117        return self._wxtimer_schedule(duetime, action, state=state)
118
119    def schedule_absolute(self,
120                          duetime: typing.AbsoluteTime,
121                          action: typing.ScheduledAction,
122                          state: Optional[typing.TState] = None
123                          ) -> typing.Disposable:
124        """Schedules an action to be executed at duetime.
125
126        Args:
127            duetime: Absolute time at which to execute the action.
128            action: Action to be executed.
129            state: [Optional] state to be given to the action function.
130
131        Returns:
132            The disposable object used to cancel the scheduled action
133            (best effort).
134        """
135
136        duetime = self.to_datetime(duetime)
137        return self._wxtimer_schedule(duetime - self.now, action, state=state)
138
139    def schedule_periodic(self,
140                          period: typing.RelativeTime,
141                          action: typing.ScheduledPeriodicAction,
142                          state: Optional[typing.TState] = None
143                          ) -> typing.Disposable:
144        """Schedules a periodic piece of work to be executed in the loop.
145
146       Args:
147            period: Period in seconds for running the work repeatedly.
148            action: Action to be executed.
149            state: [Optional] state to be given to the action function.
150
151        Returns:
152            The disposable object used to cancel the scheduled action
153            (best effort).
154        """
155
156        return self._wxtimer_schedule(period, action, state=state, periodic=True)
157