1import logging
2
3eventlet = None
4
5from rx.core import Disposable
6from rx.disposables import SingleAssignmentDisposable, CompositeDisposable
7from rx.concurrency.schedulerbase import SchedulerBase
8
9log = logging.getLogger("Rx")
10
11
12class EventLetEventScheduler(SchedulerBase):
13    """A scheduler that schedules work via the eventlet event loop.
14
15    http://eventlet.net/
16    """
17
18    def __init__(self):
19        # Lazy import
20        global eventlet
21        import eventlet
22        import eventlet.hubs
23
24    def schedule(self, action, state=None):
25        """Schedules an action to be executed."""
26
27        disposable = SingleAssignmentDisposable()
28
29        def interval():
30            disposable.disposable = self.invoke_action(action, state)
31
32        timer = [eventlet.spawn(interval)]
33
34        def dispose():
35            timer[0].kill()
36
37        return CompositeDisposable(disposable, Disposable.create(dispose))
38
39    def schedule_relative(self, duetime, action, state=None):
40        """Schedules an action to be executed after duetime.
41
42        Keyword arguments:
43        duetime -- {timedelta} Relative time after which to execute the action.
44        action -- {Function} Action to be executed.
45
46        Returns {Disposable} The disposable object used to cancel the scheduled
47        action (best effort)."""
48
49        scheduler = self
50        seconds = self.to_relative(duetime)/1000.0
51        if not seconds:
52            return scheduler.schedule(action, state)
53
54        disposable = SingleAssignmentDisposable()
55
56        def interval():
57            disposable.disposable = self.invoke_action(action, state)
58
59        log.debug("timeout: %s", seconds)
60        timer = [eventlet.spawn_after(seconds, interval)]
61
62        def dispose():
63            # nonlocal timer
64            timer[0].kill()
65
66        return CompositeDisposable(disposable, Disposable.create(dispose))
67
68    def schedule_absolute(self, duetime, action, state=None):
69        """Schedules an action to be executed at duetime.
70
71        Keyword arguments:
72        duetime -- {datetime} Absolute time after which to execute the action.
73        action -- {Function} Action to be executed.
74
75        Returns {Disposable} The disposable object used to cancel the scheduled
76        action (best effort)."""
77
78        duetime = self.to_datetime(duetime)
79        return self.schedule_relative(duetime - self.now, action, state)
80
81    @property
82    def now(self):
83        """Represents a notion of time for this scheduler. Tasks being scheduled
84        on a scheduler will adhere to the time denoted by this property."""
85
86        return self.to_datetime(eventlet.hubs.hub.time.time())
87