1# Backport of selectors.py from Python 3.5+ to support Python < 3.4
2# Also has the behavior specified in PEP 475 which is to retry syscalls
3# in the case of an EINTR error. This module is required because selectors34
4# does not follow this behavior and instead returns that no dile descriptor
5# events have occurred rather than retry the syscall. The decision to drop
6# support for select.devpoll is made to maintain 100% test coverage.
7
8import errno
9import math
10import select
11from collections import namedtuple
12
13try:
14    from collections.abc import Mapping
15except ImportError:
16    from collections import Mapping
17
18import time
19try:
20    monotonic = time.monotonic
21except (AttributeError, ImportError):  # Python 3.3<
22    monotonic = time.time
23
24EVENT_READ = (1 << 0)
25EVENT_WRITE = (1 << 1)
26
27HAS_SELECT = True  # Variable that shows whether the platform has a selector.
28_SYSCALL_SENTINEL = object()  # Sentinel in case a system call returns None.
29
30
31class SelectorError(Exception):
32    def __init__(self, errcode):
33        super(SelectorError, self).__init__()
34        self.errno = errcode
35
36    def __repr__(self):
37        return "<SelectorError errno={0}>".format(self.errno)
38
39    def __str__(self):
40        return self.__repr__()
41
42
43def _fileobj_to_fd(fileobj):
44    """ Return a file descriptor from a file object. If
45    given an integer will simply return that integer back. """
46    if isinstance(fileobj, int):
47        fd = fileobj
48    else:
49        try:
50            fd = int(fileobj.fileno())
51        except (AttributeError, TypeError, ValueError):
52            raise ValueError("Invalid file object: {0!r}".format(fileobj))
53    if fd < 0:
54        raise ValueError("Invalid file descriptor: {0}".format(fd))
55    return fd
56
57
58def _syscall_wrapper(func, recalc_timeout, *args, **kwargs):
59    """ Wrapper function for syscalls that could fail due to EINTR.
60    All functions should be retried if there is time left in the timeout
61    in accordance with PEP 475. """
62    timeout = kwargs.get("timeout", None)
63    if timeout is None:
64        expires = None
65        recalc_timeout = False
66    else:
67        timeout = float(timeout)
68        if timeout < 0.0:  # Timeout less than 0 treated as no timeout.
69            expires = None
70        else:
71            expires = monotonic() + timeout
72
73    args = list(args)
74    if recalc_timeout and "timeout" not in kwargs:
75        raise ValueError(
76            "Timeout must be in args or kwargs to be recalculated")
77
78    result = _SYSCALL_SENTINEL
79    while result is _SYSCALL_SENTINEL:
80        try:
81            result = func(*args, **kwargs)
82        # OSError is thrown by select.select
83        # IOError is thrown by select.epoll.poll
84        # select.error is thrown by select.poll.poll
85        # Aren't we thankful for Python 3.x rework for exceptions?
86        except (OSError, IOError, select.error) as e:
87            # select.error wasn't a subclass of OSError in the past.
88            errcode = None
89            if hasattr(e, "errno"):
90                errcode = e.errno
91            elif hasattr(e, "args"):
92                errcode = e.args[0]
93
94            # Also test for the Windows equivalent of EINTR.
95            is_interrupt = (errcode == errno.EINTR or (hasattr(errno, "WSAEINTR") and
96                                                       errcode == errno.WSAEINTR))
97
98            if is_interrupt:
99                if expires is not None:
100                    current_time = monotonic()
101                    if current_time > expires:
102                        raise OSError(errno=errno.ETIMEDOUT)
103                    if recalc_timeout:
104                        if "timeout" in kwargs:
105                            kwargs["timeout"] = expires - current_time
106                continue
107            if errcode:
108                raise SelectorError(errcode)
109            else:
110                raise
111    return result
112
113
114SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
115
116
117class _SelectorMapping(Mapping):
118    """ Mapping of file objects to selector keys """
119
120    def __init__(self, selector):
121        self._selector = selector
122
123    def __len__(self):
124        return len(self._selector._fd_to_key)
125
126    def __getitem__(self, fileobj):
127        try:
128            fd = self._selector._fileobj_lookup(fileobj)
129            return self._selector._fd_to_key[fd]
130        except KeyError:
131            raise KeyError("{0!r} is not registered.".format(fileobj))
132
133    def __iter__(self):
134        return iter(self._selector._fd_to_key)
135
136
137class BaseSelector(object):
138    """ Abstract Selector class
139
140    A selector supports registering file objects to be monitored
141    for specific I/O events.
142
143    A file object is a file descriptor or any object with a
144    `fileno()` method. An arbitrary object can be attached to the
145    file object which can be used for example to store context info,
146    a callback, etc.
147
148    A selector can use various implementations (select(), poll(), epoll(),
149    and kqueue()) depending on the platform. The 'DefaultSelector' class uses
150    the most efficient implementation for the current platform.
151    """
152    def __init__(self):
153        # Maps file descriptors to keys.
154        self._fd_to_key = {}
155
156        # Read-only mapping returned by get_map()
157        self._map = _SelectorMapping(self)
158
159    def _fileobj_lookup(self, fileobj):
160        """ Return a file descriptor from a file object.
161        This wraps _fileobj_to_fd() to do an exhaustive
162        search in case the object is invalid but we still
163        have it in our map. Used by unregister() so we can
164        unregister an object that was previously registered
165        even if it is closed. It is also used by _SelectorMapping
166        """
167        try:
168            return _fileobj_to_fd(fileobj)
169        except ValueError:
170
171            # Search through all our mapped keys.
172            for key in self._fd_to_key.values():
173                if key.fileobj is fileobj:
174                    return key.fd
175
176            # Raise ValueError after all.
177            raise
178
179    def register(self, fileobj, events, data=None):
180        """ Register a file object for a set of events to monitor. """
181        if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
182            raise ValueError("Invalid events: {0!r}".format(events))
183
184        key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
185
186        if key.fd in self._fd_to_key:
187            raise KeyError("{0!r} (FD {1}) is already registered"
188                           .format(fileobj, key.fd))
189
190        self._fd_to_key[key.fd] = key
191        return key
192
193    def unregister(self, fileobj):
194        """ Unregister a file object from being monitored. """
195        try:
196            key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
197        except KeyError:
198            raise KeyError("{0!r} is not registered".format(fileobj))
199        return key
200
201    def modify(self, fileobj, events, data=None):
202        """ Change a registered file object monitored events and data. """
203        # NOTE: Some subclasses optimize this operation even further.
204        try:
205            key = self._fd_to_key[self._fileobj_lookup(fileobj)]
206        except KeyError:
207            raise KeyError("{0!r} is not registered".format(fileobj))
208
209        if events != key.events:
210            self.unregister(fileobj)
211            key = self.register(fileobj, events, data)
212
213        elif data != key.data:
214            # Use a shortcut to update the data.
215            key = key._replace(data=data)
216            self._fd_to_key[key.fd] = key
217
218        return key
219
220    def select(self, timeout=None):
221        """ Perform the actual selection until some monitored file objects
222        are ready or the timeout expires. """
223        raise NotImplementedError()
224
225    def close(self):
226        """ Close the selector. This must be called to ensure that all
227        underlying resources are freed. """
228        self._fd_to_key.clear()
229        self._map = None
230
231    def get_key(self, fileobj):
232        """ Return the key associated with a registered file object. """
233        mapping = self.get_map()
234        if mapping is None:
235            raise RuntimeError("Selector is closed")
236        try:
237            return mapping[fileobj]
238        except KeyError:
239            raise KeyError("{0!r} is not registered".format(fileobj))
240
241    def get_map(self):
242        """ Return a mapping of file objects to selector keys """
243        return self._map
244
245    def _key_from_fd(self, fd):
246        """ Return the key associated to a given file descriptor
247         Return None if it is not found. """
248        try:
249            return self._fd_to_key[fd]
250        except KeyError:
251            return None
252
253    def __enter__(self):
254        return self
255
256    def __exit__(self, *args):
257        self.close()
258
259
260# Almost all platforms have select.select()
261if hasattr(select, "select"):
262    class SelectSelector(BaseSelector):
263        """ Select-based selector. """
264        def __init__(self):
265            super(SelectSelector, self).__init__()
266            self._readers = set()
267            self._writers = set()
268
269        def register(self, fileobj, events, data=None):
270            key = super(SelectSelector, self).register(fileobj, events, data)
271            if events & EVENT_READ:
272                self._readers.add(key.fd)
273            if events & EVENT_WRITE:
274                self._writers.add(key.fd)
275            return key
276
277        def unregister(self, fileobj):
278            key = super(SelectSelector, self).unregister(fileobj)
279            self._readers.discard(key.fd)
280            self._writers.discard(key.fd)
281            return key
282
283        def _select(self, r, w, timeout=None):
284            """ Wrapper for select.select because timeout is a positional arg """
285            return select.select(r, w, [], timeout)
286
287        def select(self, timeout=None):
288            # Selecting on empty lists on Windows errors out.
289            if not len(self._readers) and not len(self._writers):
290                return []
291
292            timeout = None if timeout is None else max(timeout, 0.0)
293            ready = []
294            r, w, _ = _syscall_wrapper(self._select, True, self._readers,
295                                       self._writers, timeout)
296            r = set(r)
297            w = set(w)
298            for fd in r | w:
299                events = 0
300                if fd in r:
301                    events |= EVENT_READ
302                if fd in w:
303                    events |= EVENT_WRITE
304
305                key = self._key_from_fd(fd)
306                if key:
307                    ready.append((key, events & key.events))
308            return ready
309
310
311if hasattr(select, "poll"):
312    class PollSelector(BaseSelector):
313        """ Poll-based selector """
314        def __init__(self):
315            super(PollSelector, self).__init__()
316            self._poll = select.poll()
317
318        def register(self, fileobj, events, data=None):
319            key = super(PollSelector, self).register(fileobj, events, data)
320            event_mask = 0
321            if events & EVENT_READ:
322                event_mask |= select.POLLIN
323            if events & EVENT_WRITE:
324                event_mask |= select.POLLOUT
325            self._poll.register(key.fd, event_mask)
326            return key
327
328        def unregister(self, fileobj):
329            key = super(PollSelector, self).unregister(fileobj)
330            self._poll.unregister(key.fd)
331            return key
332
333        def _wrap_poll(self, timeout=None):
334            """ Wrapper function for select.poll.poll() so that
335            _syscall_wrapper can work with only seconds. """
336            if timeout is not None:
337                if timeout <= 0:
338                    timeout = 0
339                else:
340                    # select.poll.poll() has a resolution of 1 millisecond,
341                    # round away from zero to wait *at least* timeout seconds.
342                    timeout = math.ceil(timeout * 1e3)
343
344            result = self._poll.poll(timeout)
345            return result
346
347        def select(self, timeout=None):
348            ready = []
349            fd_events = _syscall_wrapper(self._wrap_poll, True, timeout=timeout)
350            for fd, event_mask in fd_events:
351                events = 0
352                if event_mask & ~select.POLLIN:
353                    events |= EVENT_WRITE
354                if event_mask & ~select.POLLOUT:
355                    events |= EVENT_READ
356
357                key = self._key_from_fd(fd)
358                if key:
359                    ready.append((key, events & key.events))
360
361            return ready
362
363
364if hasattr(select, "epoll"):
365    class EpollSelector(BaseSelector):
366        """ Epoll-based selector """
367        def __init__(self):
368            super(EpollSelector, self).__init__()
369            self._epoll = select.epoll()
370
371        def fileno(self):
372            return self._epoll.fileno()
373
374        def register(self, fileobj, events, data=None):
375            key = super(EpollSelector, self).register(fileobj, events, data)
376            events_mask = 0
377            if events & EVENT_READ:
378                events_mask |= select.EPOLLIN
379            if events & EVENT_WRITE:
380                events_mask |= select.EPOLLOUT
381            _syscall_wrapper(self._epoll.register, False, key.fd, events_mask)
382            return key
383
384        def unregister(self, fileobj):
385            key = super(EpollSelector, self).unregister(fileobj)
386            try:
387                _syscall_wrapper(self._epoll.unregister, False, key.fd)
388            except SelectorError:
389                # This can occur when the fd was closed since registry.
390                pass
391            return key
392
393        def select(self, timeout=None):
394            if timeout is not None:
395                if timeout <= 0:
396                    timeout = 0.0
397                else:
398                    # select.epoll.poll() has a resolution of 1 millisecond
399                    # but luckily takes seconds so we don't need a wrapper
400                    # like PollSelector. Just for better rounding.
401                    timeout = math.ceil(timeout * 1e3) * 1e-3
402                timeout = float(timeout)
403            else:
404                timeout = -1.0  # epoll.poll() must have a float.
405
406            # We always want at least 1 to ensure that select can be called
407            # with no file descriptors registered. Otherwise will fail.
408            max_events = max(len(self._fd_to_key), 1)
409
410            ready = []
411            fd_events = _syscall_wrapper(self._epoll.poll, True,
412                                         timeout=timeout,
413                                         maxevents=max_events)
414            for fd, event_mask in fd_events:
415                events = 0
416                if event_mask & ~select.EPOLLIN:
417                    events |= EVENT_WRITE
418                if event_mask & ~select.EPOLLOUT:
419                    events |= EVENT_READ
420
421                key = self._key_from_fd(fd)
422                if key:
423                    ready.append((key, events & key.events))
424            return ready
425
426        def close(self):
427            self._epoll.close()
428            super(EpollSelector, self).close()
429
430
431if hasattr(select, "kqueue"):
432    class KqueueSelector(BaseSelector):
433        """ Kqueue / Kevent-based selector """
434        def __init__(self):
435            super(KqueueSelector, self).__init__()
436            self._kqueue = select.kqueue()
437
438        def fileno(self):
439            return self._kqueue.fileno()
440
441        def register(self, fileobj, events, data=None):
442            key = super(KqueueSelector, self).register(fileobj, events, data)
443            if events & EVENT_READ:
444                kevent = select.kevent(key.fd,
445                                       select.KQ_FILTER_READ,
446                                       select.KQ_EV_ADD)
447
448                _syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0)
449
450            if events & EVENT_WRITE:
451                kevent = select.kevent(key.fd,
452                                       select.KQ_FILTER_WRITE,
453                                       select.KQ_EV_ADD)
454
455                _syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0)
456
457            return key
458
459        def unregister(self, fileobj):
460            key = super(KqueueSelector, self).unregister(fileobj)
461            if key.events & EVENT_READ:
462                kevent = select.kevent(key.fd,
463                                       select.KQ_FILTER_READ,
464                                       select.KQ_EV_DELETE)
465                try:
466                    _syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0)
467                except SelectorError:
468                    pass
469            if key.events & EVENT_WRITE:
470                kevent = select.kevent(key.fd,
471                                       select.KQ_FILTER_WRITE,
472                                       select.KQ_EV_DELETE)
473                try:
474                    _syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0)
475                except SelectorError:
476                    pass
477
478            return key
479
480        def select(self, timeout=None):
481            if timeout is not None:
482                timeout = max(timeout, 0)
483
484            max_events = len(self._fd_to_key) * 2
485            ready_fds = {}
486
487            kevent_list = _syscall_wrapper(self._kqueue.control, True,
488                                           None, max_events, timeout)
489
490            for kevent in kevent_list:
491                fd = kevent.ident
492                event_mask = kevent.filter
493                events = 0
494                if event_mask == select.KQ_FILTER_READ:
495                    events |= EVENT_READ
496                if event_mask == select.KQ_FILTER_WRITE:
497                    events |= EVENT_WRITE
498
499                key = self._key_from_fd(fd)
500                if key:
501                    if key.fd not in ready_fds:
502                        ready_fds[key.fd] = (key, events & key.events)
503                    else:
504                        old_events = ready_fds[key.fd][1]
505                        ready_fds[key.fd] = (key, (events | old_events) & key.events)
506
507            return list(ready_fds.values())
508
509        def close(self):
510            self._kqueue.close()
511            super(KqueueSelector, self).close()
512
513
514# Choose the best implementation, roughly:
515# kqueue == epoll > poll > select. Devpoll not supported. (See above)
516# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
517if 'KqueueSelector' in globals():  # Platform-specific: Mac OS and BSD
518    DefaultSelector = KqueueSelector
519elif 'EpollSelector' in globals():  # Platform-specific: Linux
520    DefaultSelector = EpollSelector
521elif 'PollSelector' in globals():  # Platform-specific: Linux
522    DefaultSelector = PollSelector
523elif 'SelectSelector' in globals():  # Platform-specific: Windows
524    DefaultSelector = SelectSelector
525else:  # Platform-specific: AppEngine
526    def no_selector(_):
527        raise ValueError("Platform does not have a selector")
528    DefaultSelector = no_selector
529    HAS_SELECT = False
530