1import logging 2 3eventlet = None 4 5from rx.core import Disposable 6from rx.disposables import SingleAssignmentDisposable, CompositeDisposable 7from rx.concurrency.schedulerbase import SchedulerBase 8 9log = logging.getLogger("Rx") 10 11 12class EventLetEventScheduler(SchedulerBase): 13 """A scheduler that schedules work via the eventlet event loop. 14 15 http://eventlet.net/ 16 """ 17 18 def __init__(self): 19 # Lazy import 20 global eventlet 21 import eventlet 22 import eventlet.hubs 23 24 def schedule(self, action, state=None): 25 """Schedules an action to be executed.""" 26 27 disposable = SingleAssignmentDisposable() 28 29 def interval(): 30 disposable.disposable = self.invoke_action(action, state) 31 32 timer = [eventlet.spawn(interval)] 33 34 def dispose(): 35 timer[0].kill() 36 37 return CompositeDisposable(disposable, Disposable.create(dispose)) 38 39 def schedule_relative(self, duetime, action, state=None): 40 """Schedules an action to be executed after duetime. 41 42 Keyword arguments: 43 duetime -- {timedelta} Relative time after which to execute the action. 44 action -- {Function} Action to be executed. 45 46 Returns {Disposable} The disposable object used to cancel the scheduled 47 action (best effort).""" 48 49 scheduler = self 50 seconds = self.to_relative(duetime)/1000.0 51 if not seconds: 52 return scheduler.schedule(action, state) 53 54 disposable = SingleAssignmentDisposable() 55 56 def interval(): 57 disposable.disposable = self.invoke_action(action, state) 58 59 log.debug("timeout: %s", seconds) 60 timer = [eventlet.spawn_after(seconds, interval)] 61 62 def dispose(): 63 # nonlocal timer 64 timer[0].kill() 65 66 return CompositeDisposable(disposable, Disposable.create(dispose)) 67 68 def schedule_absolute(self, duetime, action, state=None): 69 """Schedules an action to be executed at duetime. 70 71 Keyword arguments: 72 duetime -- {datetime} Absolute time after which to execute the action. 73 action -- {Function} Action to be executed. 74 75 Returns {Disposable} The disposable object used to cancel the scheduled 76 action (best effort).""" 77 78 duetime = self.to_datetime(duetime) 79 return self.schedule_relative(duetime - self.now, action, state) 80 81 @property 82 def now(self): 83 """Represents a notion of time for this scheduler. Tasks being scheduled 84 on a scheduler will adhere to the time denoted by this property.""" 85 86 return self.to_datetime(eventlet.hubs.hub.time.time()) 87