1"""Selector Utilities."""
2from __future__ import absolute_import, unicode_literals
3
4import errno
5import math
6import select as __select__
7import socket
8import sys
9
10from numbers import Integral
11
12from . import fileno
13from .compat import detect_environment
14
15__all__ = ('poll',)
16
17_selectf = __select__.select
18_selecterr = __select__.error
19xpoll = getattr(__select__, 'poll', None)
20epoll = getattr(__select__, 'epoll', None)
21kqueue = getattr(__select__, 'kqueue', None)
22kevent = getattr(__select__, 'kevent', None)
23KQ_EV_ADD = getattr(__select__, 'KQ_EV_ADD', 1)
24KQ_EV_DELETE = getattr(__select__, 'KQ_EV_DELETE', 2)
25KQ_EV_ENABLE = getattr(__select__, 'KQ_EV_ENABLE', 4)
26KQ_EV_CLEAR = getattr(__select__, 'KQ_EV_CLEAR', 32)
27KQ_EV_ERROR = getattr(__select__, 'KQ_EV_ERROR', 16384)
28KQ_EV_EOF = getattr(__select__, 'KQ_EV_EOF', 32768)
29KQ_FILTER_READ = getattr(__select__, 'KQ_FILTER_READ', -1)
30KQ_FILTER_WRITE = getattr(__select__, 'KQ_FILTER_WRITE', -2)
31KQ_FILTER_AIO = getattr(__select__, 'KQ_FILTER_AIO', -3)
32KQ_FILTER_VNODE = getattr(__select__, 'KQ_FILTER_VNODE', -4)
33KQ_FILTER_PROC = getattr(__select__, 'KQ_FILTER_PROC', -5)
34KQ_FILTER_SIGNAL = getattr(__select__, 'KQ_FILTER_SIGNAL', -6)
35KQ_FILTER_TIMER = getattr(__select__, 'KQ_FILTER_TIMER', -7)
36KQ_NOTE_LOWAT = getattr(__select__, 'KQ_NOTE_LOWAT', 1)
37KQ_NOTE_DELETE = getattr(__select__, 'KQ_NOTE_DELETE', 1)
38KQ_NOTE_WRITE = getattr(__select__, 'KQ_NOTE_WRITE', 2)
39KQ_NOTE_EXTEND = getattr(__select__, 'KQ_NOTE_EXTEND', 4)
40KQ_NOTE_ATTRIB = getattr(__select__, 'KQ_NOTE_ATTRIB', 8)
41KQ_NOTE_LINK = getattr(__select__, 'KQ_NOTE_LINK', 16)
42KQ_NOTE_RENAME = getattr(__select__, 'KQ_NOTE_RENAME', 32)
43KQ_NOTE_REVOKE = getattr(__select__, 'KQ_NOTE_REVOKE', 64)
44POLLIN = getattr(__select__, 'POLLIN', 1)
45POLLOUT = getattr(__select__, 'POLLOUT', 4)
46POLLERR = getattr(__select__, 'POLLERR', 8)
47POLLHUP = getattr(__select__, 'POLLHUP', 16)
48POLLNVAL = getattr(__select__, 'POLLNVAL', 32)
49
50READ = POLL_READ = 0x001
51WRITE = POLL_WRITE = 0x004
52ERR = POLL_ERR = 0x008 | 0x010
53
54try:
55    SELECT_BAD_FD = {errno.EBADF, errno.WSAENOTSOCK}
56except AttributeError:
57    SELECT_BAD_FD = {errno.EBADF}
58
59
60class _epoll(object):
61
62    def __init__(self):
63        self._epoll = epoll()
64
65    def register(self, fd, events):
66        try:
67            self._epoll.register(fd, events)
68        except Exception as exc:
69            if getattr(exc, 'errno', None) != errno.EEXIST:
70                raise
71        return fd
72
73    def unregister(self, fd):
74        try:
75            self._epoll.unregister(fd)
76        except (socket.error, ValueError, KeyError, TypeError):
77            pass
78        except (IOError, OSError) as exc:
79            if getattr(exc, 'errno', None) not in (errno.ENOENT, errno.EPERM):
80                raise
81
82    def poll(self, timeout):
83        try:
84            return self._epoll.poll(timeout if timeout is not None else -1)
85        except Exception as exc:
86            if getattr(exc, 'errno', None) != errno.EINTR:
87                raise
88
89    def close(self):
90        self._epoll.close()
91
92
93class _kqueue(object):
94    w_fflags = (KQ_NOTE_WRITE | KQ_NOTE_EXTEND |
95                KQ_NOTE_ATTRIB | KQ_NOTE_DELETE)
96
97    def __init__(self):
98        self._kqueue = kqueue()
99        self._active = {}
100        self.on_file_change = None
101        self._kcontrol = self._kqueue.control
102
103    def register(self, fd, events):
104        self._control(fd, events, KQ_EV_ADD)
105        self._active[fd] = events
106        return fd
107
108    def unregister(self, fd):
109        events = self._active.pop(fd, None)
110        if events:
111            try:
112                self._control(fd, events, KQ_EV_DELETE)
113            except socket.error:
114                pass
115
116    def watch_file(self, fd):
117        ev = kevent(fd,
118                    filter=KQ_FILTER_VNODE,
119                    flags=KQ_EV_ADD | KQ_EV_ENABLE | KQ_EV_CLEAR,
120                    fflags=self.w_fflags)
121        self._kcontrol([ev], 0)
122
123    def unwatch_file(self, fd):
124        ev = kevent(fd,
125                    filter=KQ_FILTER_VNODE,
126                    flags=KQ_EV_DELETE,
127                    fflags=self.w_fflags)
128        self._kcontrol([ev], 0)
129
130    def _control(self, fd, events, flags):
131        if not events:
132            return
133        kevents = []
134        if events & WRITE:
135            kevents.append(kevent(fd,
136                           filter=KQ_FILTER_WRITE,
137                           flags=flags))
138        if not kevents or events & READ:
139            kevents.append(
140                kevent(fd, filter=KQ_FILTER_READ, flags=flags),
141            )
142        control = self._kcontrol
143        for e in kevents:
144            try:
145                control([e], 0)
146            except ValueError:
147                pass
148
149    def poll(self, timeout):
150        try:
151            kevents = self._kcontrol(None, 1000, timeout)
152        except Exception as exc:
153            if getattr(exc, 'errno', None) == errno.EINTR:
154                return
155            raise
156        events, file_changes = {}, []
157        for k in kevents:
158            fd = k.ident
159            if k.filter == KQ_FILTER_READ:
160                events[fd] = events.get(fd, 0) | READ
161            elif k.filter == KQ_FILTER_WRITE:
162                if k.flags & KQ_EV_EOF:
163                    events[fd] = ERR
164                else:
165                    events[fd] = events.get(fd, 0) | WRITE
166            elif k.filter == KQ_EV_ERROR:
167                events[fd] = events.get(fd, 0) | ERR
168            elif k.filter == KQ_FILTER_VNODE:
169                if k.fflags & KQ_NOTE_DELETE:
170                    self.unregister(fd)
171                file_changes.append(k)
172        if file_changes:
173            self.on_file_change(file_changes)
174        return list(events.items())
175
176    def close(self):
177        self._kqueue.close()
178
179
180class _poll(object):
181
182    def __init__(self):
183        self._poller = xpoll()
184        self._quick_poll = self._poller.poll
185        self._quick_register = self._poller.register
186        self._quick_unregister = self._poller.unregister
187
188    def register(self, fd, events):
189        fd = fileno(fd)
190        poll_flags = 0
191        if events & ERR:
192            poll_flags |= POLLERR
193        if events & WRITE:
194            poll_flags |= POLLOUT
195        if events & READ:
196            poll_flags |= POLLIN
197        self._quick_register(fd, poll_flags)
198        return fd
199
200    def unregister(self, fd):
201        try:
202            fd = fileno(fd)
203        except socket.error as exc:
204            # we don't know the previous fd of this object
205            # but it will be removed by the next poll iteration.
206            if getattr(exc, 'errno', None) in SELECT_BAD_FD:
207                return fd
208            raise
209        self._quick_unregister(fd)
210        return fd
211
212    def poll(self, timeout, round=math.ceil,
213             POLLIN=POLLIN, POLLOUT=POLLOUT, POLLERR=POLLERR,
214             READ=READ, WRITE=WRITE, ERR=ERR, Integral=Integral):
215        timeout = 0 if timeout and timeout < 0 else round((timeout or 0) * 1e3)
216        try:
217            event_list = self._quick_poll(timeout)
218        except (_selecterr, socket.error) as exc:
219            if getattr(exc, 'errno', None) == errno.EINTR:
220                return
221            raise
222
223        ready = []
224        for fd, event in event_list:
225            events = 0
226            if event & POLLIN:
227                events |= READ
228            if event & POLLOUT:
229                events |= WRITE
230            if event & POLLERR or event & POLLNVAL or event & POLLHUP:
231                events |= ERR
232            assert events
233            if not isinstance(fd, Integral):
234                fd = fd.fileno()
235            ready.append((fd, events))
236        return ready
237
238    def close(self):
239        self._poller = None
240
241
242class _select(object):
243
244    def __init__(self):
245        self._all = (self._rfd,
246                     self._wfd,
247                     self._efd) = set(), set(), set()
248
249    def register(self, fd, events):
250        fd = fileno(fd)
251        if events & ERR:
252            self._efd.add(fd)
253        if events & WRITE:
254            self._wfd.add(fd)
255        if events & READ:
256            self._rfd.add(fd)
257        return fd
258
259    def _remove_bad(self):
260        for fd in self._rfd | self._wfd | self._efd:
261            try:
262                _selectf([fd], [], [], 0)
263            except (_selecterr, socket.error) as exc:
264                if getattr(exc, 'errno', None) in SELECT_BAD_FD:
265                    self.unregister(fd)
266
267    def unregister(self, fd):
268        try:
269            fd = fileno(fd)
270        except socket.error as exc:
271            # we don't know the previous fd of this object
272            # but it will be removed by the next poll iteration.
273            if getattr(exc, 'errno', None) in SELECT_BAD_FD:
274                return
275            raise
276        self._rfd.discard(fd)
277        self._wfd.discard(fd)
278        self._efd.discard(fd)
279
280    def poll(self, timeout):
281        try:
282            read, write, error = _selectf(
283                self._rfd, self._wfd, self._efd, timeout,
284            )
285        except (_selecterr, socket.error) as exc:
286            if getattr(exc, 'errno', None) == errno.EINTR:
287                return
288            elif getattr(exc, 'errno', None) in SELECT_BAD_FD:
289                return self._remove_bad()
290            raise
291
292        events = {}
293        for fd in read:
294            if not isinstance(fd, Integral):
295                fd = fd.fileno()
296            events[fd] = events.get(fd, 0) | READ
297        for fd in write:
298            if not isinstance(fd, Integral):
299                fd = fd.fileno()
300            events[fd] = events.get(fd, 0) | WRITE
301        for fd in error:
302            if not isinstance(fd, Integral):
303                fd = fd.fileno()
304            events[fd] = events.get(fd, 0) | ERR
305        return list(events.items())
306
307    def close(self):
308        self._rfd.clear()
309        self._wfd.clear()
310        self._efd.clear()
311
312
313def _get_poller():
314    if detect_environment() != 'default':
315        # greenlet
316        return _select
317    elif epoll:
318        # Py2.6+ Linux
319        return _epoll
320    elif kqueue and 'netbsd' in sys.platform:
321        return _kqueue
322    elif xpoll:
323        return _poll
324    else:
325        return _select
326
327
328def poll(*args, **kwargs):
329    """Create new poller instance."""
330    return _get_poller()(*args, **kwargs)
331