1import os
2import sys
3from eventlet import patcher, support
4from eventlet.hubs import hub
5import six
6select = patcher.original('select')
7time = patcher.original('time')
8
9
10def is_available():
11    return hasattr(select, 'kqueue')
12
13
14class Hub(hub.BaseHub):
15    MAX_EVENTS = 100
16
17    def __init__(self, clock=None):
18        self.FILTERS = {
19            hub.READ: select.KQ_FILTER_READ,
20            hub.WRITE: select.KQ_FILTER_WRITE,
21        }
22        super(Hub, self).__init__(clock)
23        self._events = {}
24        self._init_kqueue()
25
26    def _init_kqueue(self):
27        self.kqueue = select.kqueue()
28        self._pid = os.getpid()
29
30    def _reinit_kqueue(self):
31        self.kqueue.close()
32        self._init_kqueue()
33        events = [e for i in six.itervalues(self._events)
34                  for e in six.itervalues(i)]
35        self.kqueue.control(events, 0, 0)
36
37    def _control(self, events, max_events, timeout):
38        try:
39            return self.kqueue.control(events, max_events, timeout)
40        except (OSError, IOError):
41            # have we forked?
42            if os.getpid() != self._pid:
43                self._reinit_kqueue()
44                return self.kqueue.control(events, max_events, timeout)
45            raise
46
47    def add(self, evtype, fileno, cb, tb, mac):
48        listener = super(Hub, self).add(evtype, fileno, cb, tb, mac)
49        events = self._events.setdefault(fileno, {})
50        if evtype not in events:
51            try:
52                event = select.kevent(fileno, self.FILTERS.get(evtype), select.KQ_EV_ADD)
53                self._control([event], 0, 0)
54                events[evtype] = event
55            except ValueError:
56                super(Hub, self).remove(listener)
57                raise
58        return listener
59
60    def _delete_events(self, events):
61        del_events = [
62            select.kevent(e.ident, e.filter, select.KQ_EV_DELETE)
63            for e in events
64        ]
65        self._control(del_events, 0, 0)
66
67    def remove(self, listener):
68        super(Hub, self).remove(listener)
69        evtype = listener.evtype
70        fileno = listener.fileno
71        if not self.listeners[evtype].get(fileno):
72            event = self._events[fileno].pop(evtype, None)
73            if event is None:
74                return
75            try:
76                self._delete_events((event,))
77            except OSError:
78                pass
79
80    def remove_descriptor(self, fileno):
81        super(Hub, self).remove_descriptor(fileno)
82        try:
83            events = self._events.pop(fileno).values()
84            self._delete_events(events)
85        except KeyError:
86            pass
87        except OSError:
88            pass
89
90    def wait(self, seconds=None):
91        readers = self.listeners[self.READ]
92        writers = self.listeners[self.WRITE]
93
94        if not readers and not writers:
95            if seconds:
96                time.sleep(seconds)
97            return
98        result = self._control([], self.MAX_EVENTS, seconds)
99        SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS
100        for event in result:
101            fileno = event.ident
102            evfilt = event.filter
103            try:
104                if evfilt == select.KQ_FILTER_READ:
105                    readers.get(fileno, hub.noop).cb(fileno)
106                if evfilt == select.KQ_FILTER_WRITE:
107                    writers.get(fileno, hub.noop).cb(fileno)
108            except SYSTEM_EXCEPTIONS:
109                raise
110            except:
111                self.squelch_exception(fileno, sys.exc_info())
112                support.clear_sys_exc_info()
113