1import os 2import sys 3from eventlet import patcher, support 4from eventlet.hubs import hub 5import six 6select = patcher.original('select') 7time = patcher.original('time') 8 9 10def is_available(): 11 return hasattr(select, 'kqueue') 12 13 14class Hub(hub.BaseHub): 15 MAX_EVENTS = 100 16 17 def __init__(self, clock=None): 18 self.FILTERS = { 19 hub.READ: select.KQ_FILTER_READ, 20 hub.WRITE: select.KQ_FILTER_WRITE, 21 } 22 super(Hub, self).__init__(clock) 23 self._events = {} 24 self._init_kqueue() 25 26 def _init_kqueue(self): 27 self.kqueue = select.kqueue() 28 self._pid = os.getpid() 29 30 def _reinit_kqueue(self): 31 self.kqueue.close() 32 self._init_kqueue() 33 events = [e for i in six.itervalues(self._events) 34 for e in six.itervalues(i)] 35 self.kqueue.control(events, 0, 0) 36 37 def _control(self, events, max_events, timeout): 38 try: 39 return self.kqueue.control(events, max_events, timeout) 40 except (OSError, IOError): 41 # have we forked? 42 if os.getpid() != self._pid: 43 self._reinit_kqueue() 44 return self.kqueue.control(events, max_events, timeout) 45 raise 46 47 def add(self, evtype, fileno, cb, tb, mac): 48 listener = super(Hub, self).add(evtype, fileno, cb, tb, mac) 49 events = self._events.setdefault(fileno, {}) 50 if evtype not in events: 51 try: 52 event = select.kevent(fileno, self.FILTERS.get(evtype), select.KQ_EV_ADD) 53 self._control([event], 0, 0) 54 events[evtype] = event 55 except ValueError: 56 super(Hub, self).remove(listener) 57 raise 58 return listener 59 60 def _delete_events(self, events): 61 del_events = [ 62 select.kevent(e.ident, e.filter, select.KQ_EV_DELETE) 63 for e in events 64 ] 65 self._control(del_events, 0, 0) 66 67 def remove(self, listener): 68 super(Hub, self).remove(listener) 69 evtype = listener.evtype 70 fileno = listener.fileno 71 if not self.listeners[evtype].get(fileno): 72 event = self._events[fileno].pop(evtype, None) 73 if event is None: 74 return 75 try: 76 self._delete_events((event,)) 77 except OSError: 78 pass 79 80 def remove_descriptor(self, fileno): 81 super(Hub, self).remove_descriptor(fileno) 82 try: 83 events = self._events.pop(fileno).values() 84 self._delete_events(events) 85 except KeyError: 86 pass 87 except OSError: 88 pass 89 90 def wait(self, seconds=None): 91 readers = self.listeners[self.READ] 92 writers = self.listeners[self.WRITE] 93 94 if not readers and not writers: 95 if seconds: 96 time.sleep(seconds) 97 return 98 result = self._control([], self.MAX_EVENTS, seconds) 99 SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS 100 for event in result: 101 fileno = event.ident 102 evfilt = event.filter 103 try: 104 if evfilt == select.KQ_FILTER_READ: 105 readers.get(fileno, hub.noop).cb(fileno) 106 if evfilt == select.KQ_FILTER_WRITE: 107 writers.get(fileno, hub.noop).cb(fileno) 108 except SYSTEM_EXCEPTIONS: 109 raise 110 except: 111 self.squelch_exception(fileno, sys.exc_info()) 112 support.clear_sys_exc_info() 113