1# pylint: skip-file
2# vendored from https://github.com/berkerpeksag/selectors34
3# at commit ff61b82168d2cc9c4922ae08e2a8bf94aab61ea2 (unreleased, ~1.2)
4#
5# Original author: Charles-Francois Natali (c.f.natali[at]gmail.com)
6# Maintainer: Berker Peksag (berker.peksag[at]gmail.com)
7# Also see https://pypi.python.org/pypi/selectors34
8"""Selectors module.
9
10This module allows high-level and efficient I/O multiplexing, built upon the
11`select` module primitives.
12
13The following code adapted from trollius.selectors.
14"""
15from __future__ import absolute_import
16
17from abc import ABCMeta, abstractmethod
18from collections import namedtuple, Mapping
19from errno import EINTR
20import math
21import select
22import sys
23
24from kafka.vendor import six
25
26
27def _wrap_error(exc, mapping, key):
28    if key not in mapping:
29        return
30    new_err_cls = mapping[key]
31    new_err = new_err_cls(*exc.args)
32
33    # raise a new exception with the original traceback
34    if hasattr(exc, '__traceback__'):
35        traceback = exc.__traceback__
36    else:
37        traceback = sys.exc_info()[2]
38    six.reraise(new_err_cls, new_err, traceback)
39
40
41# generic events, that must be mapped to implementation-specific ones
42EVENT_READ = (1 << 0)
43EVENT_WRITE = (1 << 1)
44
45
46def _fileobj_to_fd(fileobj):
47    """Return a file descriptor from a file object.
48
49    Parameters:
50    fileobj -- file object or file descriptor
51
52    Returns:
53    corresponding file descriptor
54
55    Raises:
56    ValueError if the object is invalid
57    """
58    if isinstance(fileobj, six.integer_types):
59        fd = fileobj
60    else:
61        try:
62            fd = int(fileobj.fileno())
63        except (AttributeError, TypeError, ValueError):
64            raise ValueError("Invalid file object: "
65                             "{0!r}".format(fileobj))
66    if fd < 0:
67        raise ValueError("Invalid file descriptor: {0}".format(fd))
68    return fd
69
70
71SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
72"""Object used to associate a file object to its backing file descriptor,
73selected event mask and attached data."""
74
75
76class _SelectorMapping(Mapping):
77    """Mapping of file objects to selector keys."""
78
79    def __init__(self, selector):
80        self._selector = selector
81
82    def __len__(self):
83        return len(self._selector._fd_to_key)
84
85    def __getitem__(self, fileobj):
86        try:
87            fd = self._selector._fileobj_lookup(fileobj)
88            return self._selector._fd_to_key[fd]
89        except KeyError:
90            raise KeyError("{0!r} is not registered".format(fileobj))
91
92    def __iter__(self):
93        return iter(self._selector._fd_to_key)
94
95# Using six.add_metaclass() decorator instead of six.with_metaclass() because
96# the latter leaks temporary_class to garbage with gc disabled
97@six.add_metaclass(ABCMeta)
98class BaseSelector(object):
99    """Selector abstract base class.
100
101    A selector supports registering file objects to be monitored for specific
102    I/O events.
103
104    A file object is a file descriptor or any object with a `fileno()` method.
105    An arbitrary object can be attached to the file object, which can be used
106    for example to store context information, a callback, etc.
107
108    A selector can use various implementations (select(), poll(), epoll()...)
109    depending on the platform. The default `Selector` class uses the most
110    efficient implementation on the current platform.
111    """
112
113    @abstractmethod
114    def register(self, fileobj, events, data=None):
115        """Register a file object.
116
117        Parameters:
118        fileobj -- file object or file descriptor
119        events  -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
120        data    -- attached data
121
122        Returns:
123        SelectorKey instance
124
125        Raises:
126        ValueError if events is invalid
127        KeyError if fileobj is already registered
128        OSError if fileobj is closed or otherwise is unacceptable to
129                the underlying system call (if a system call is made)
130
131        Note:
132        OSError may or may not be raised
133        """
134        raise NotImplementedError
135
136    @abstractmethod
137    def unregister(self, fileobj):
138        """Unregister a file object.
139
140        Parameters:
141        fileobj -- file object or file descriptor
142
143        Returns:
144        SelectorKey instance
145
146        Raises:
147        KeyError if fileobj is not registered
148
149        Note:
150        If fileobj is registered but has since been closed this does
151        *not* raise OSError (even if the wrapped syscall does)
152        """
153        raise NotImplementedError
154
155    def modify(self, fileobj, events, data=None):
156        """Change a registered file object monitored events or attached data.
157
158        Parameters:
159        fileobj -- file object or file descriptor
160        events  -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
161        data    -- attached data
162
163        Returns:
164        SelectorKey instance
165
166        Raises:
167        Anything that unregister() or register() raises
168        """
169        self.unregister(fileobj)
170        return self.register(fileobj, events, data)
171
172    @abstractmethod
173    def select(self, timeout=None):
174        """Perform the actual selection, until some monitored file objects are
175        ready or a timeout expires.
176
177        Parameters:
178        timeout -- if timeout > 0, this specifies the maximum wait time, in
179                   seconds
180                   if timeout <= 0, the select() call won't block, and will
181                   report the currently ready file objects
182                   if timeout is None, select() will block until a monitored
183                   file object becomes ready
184
185        Returns:
186        list of (key, events) for ready file objects
187        `events` is a bitwise mask of EVENT_READ|EVENT_WRITE
188        """
189        raise NotImplementedError
190
191    def close(self):
192        """Close the selector.
193
194        This must be called to make sure that any underlying resource is freed.
195        """
196        pass
197
198    def get_key(self, fileobj):
199        """Return the key associated to a registered file object.
200
201        Returns:
202        SelectorKey for this file object
203        """
204        mapping = self.get_map()
205        if mapping is None:
206            raise RuntimeError('Selector is closed')
207        try:
208            return mapping[fileobj]
209        except KeyError:
210            raise KeyError("{0!r} is not registered".format(fileobj))
211
212    @abstractmethod
213    def get_map(self):
214        """Return a mapping of file objects to selector keys."""
215        raise NotImplementedError
216
217    def __enter__(self):
218        return self
219
220    def __exit__(self, *args):
221        self.close()
222
223
224class _BaseSelectorImpl(BaseSelector):
225    """Base selector implementation."""
226
227    def __init__(self):
228        # this maps file descriptors to keys
229        self._fd_to_key = {}
230        # read-only mapping returned by get_map()
231        self._map = _SelectorMapping(self)
232
233    def _fileobj_lookup(self, fileobj):
234        """Return a file descriptor from a file object.
235
236        This wraps _fileobj_to_fd() to do an exhaustive search in case
237        the object is invalid but we still have it in our map.  This
238        is used by unregister() so we can unregister an object that
239        was previously registered even if it is closed.  It is also
240        used by _SelectorMapping.
241        """
242        try:
243            return _fileobj_to_fd(fileobj)
244        except ValueError:
245            # Do an exhaustive search.
246            for key in self._fd_to_key.values():
247                if key.fileobj is fileobj:
248                    return key.fd
249            # Raise ValueError after all.
250            raise
251
252    def register(self, fileobj, events, data=None):
253        if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
254            raise ValueError("Invalid events: {0!r}".format(events))
255
256        key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
257
258        if key.fd in self._fd_to_key:
259            raise KeyError("{0!r} (FD {1}) is already registered"
260                           .format(fileobj, key.fd))
261
262        self._fd_to_key[key.fd] = key
263        return key
264
265    def unregister(self, fileobj):
266        try:
267            key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
268        except KeyError:
269            raise KeyError("{0!r} is not registered".format(fileobj))
270        return key
271
272    def modify(self, fileobj, events, data=None):
273        # TODO: Subclasses can probably optimize this even further.
274        try:
275            key = self._fd_to_key[self._fileobj_lookup(fileobj)]
276        except KeyError:
277            raise KeyError("{0!r} is not registered".format(fileobj))
278        if events != key.events:
279            self.unregister(fileobj)
280            key = self.register(fileobj, events, data)
281        elif data != key.data:
282            # Use a shortcut to update the data.
283            key = key._replace(data=data)
284            self._fd_to_key[key.fd] = key
285        return key
286
287    def close(self):
288        self._fd_to_key.clear()
289        self._map = None
290
291    def get_map(self):
292        return self._map
293
294    def _key_from_fd(self, fd):
295        """Return the key associated to a given file descriptor.
296
297        Parameters:
298        fd -- file descriptor
299
300        Returns:
301        corresponding key, or None if not found
302        """
303        try:
304            return self._fd_to_key[fd]
305        except KeyError:
306            return None
307
308
309class SelectSelector(_BaseSelectorImpl):
310    """Select-based selector."""
311
312    def __init__(self):
313        super(SelectSelector, self).__init__()
314        self._readers = set()
315        self._writers = set()
316
317    def register(self, fileobj, events, data=None):
318        key = super(SelectSelector, self).register(fileobj, events, data)
319        if events & EVENT_READ:
320            self._readers.add(key.fd)
321        if events & EVENT_WRITE:
322            self._writers.add(key.fd)
323        return key
324
325    def unregister(self, fileobj):
326        key = super(SelectSelector, self).unregister(fileobj)
327        self._readers.discard(key.fd)
328        self._writers.discard(key.fd)
329        return key
330
331    if sys.platform == 'win32':
332        def _select(self, r, w, _, timeout=None):
333            r, w, x = select.select(r, w, w, timeout)
334            return r, w + x, []
335    else:
336        _select = staticmethod(select.select)
337
338    def select(self, timeout=None):
339        timeout = None if timeout is None else max(timeout, 0)
340        ready = []
341        try:
342            r, w, _ = self._select(self._readers, self._writers, [], timeout)
343        except select.error as exc:
344            if exc.args[0] == EINTR:
345                return ready
346            else:
347                raise
348        r = set(r)
349        w = set(w)
350        for fd in r | w:
351            events = 0
352            if fd in r:
353                events |= EVENT_READ
354            if fd in w:
355                events |= EVENT_WRITE
356
357            key = self._key_from_fd(fd)
358            if key:
359                ready.append((key, events & key.events))
360        return ready
361
362
363if hasattr(select, 'poll'):
364
365    class PollSelector(_BaseSelectorImpl):
366        """Poll-based selector."""
367
368        def __init__(self):
369            super(PollSelector, self).__init__()
370            self._poll = select.poll()
371
372        def register(self, fileobj, events, data=None):
373            key = super(PollSelector, self).register(fileobj, events, data)
374            poll_events = 0
375            if events & EVENT_READ:
376                poll_events |= select.POLLIN
377            if events & EVENT_WRITE:
378                poll_events |= select.POLLOUT
379            self._poll.register(key.fd, poll_events)
380            return key
381
382        def unregister(self, fileobj):
383            key = super(PollSelector, self).unregister(fileobj)
384            self._poll.unregister(key.fd)
385            return key
386
387        def select(self, timeout=None):
388            if timeout is None:
389                timeout = None
390            elif timeout <= 0:
391                timeout = 0
392            else:
393                # poll() has a resolution of 1 millisecond, round away from
394                # zero to wait *at least* timeout seconds.
395                timeout = int(math.ceil(timeout * 1e3))
396            ready = []
397            try:
398                fd_event_list = self._poll.poll(timeout)
399            except select.error as exc:
400                if exc.args[0] == EINTR:
401                    return ready
402                else:
403                    raise
404            for fd, event in fd_event_list:
405                events = 0
406                if event & ~select.POLLIN:
407                    events |= EVENT_WRITE
408                if event & ~select.POLLOUT:
409                    events |= EVENT_READ
410
411                key = self._key_from_fd(fd)
412                if key:
413                    ready.append((key, events & key.events))
414            return ready
415
416
417if hasattr(select, 'epoll'):
418
419    class EpollSelector(_BaseSelectorImpl):
420        """Epoll-based selector."""
421
422        def __init__(self):
423            super(EpollSelector, self).__init__()
424            self._epoll = select.epoll()
425
426        def fileno(self):
427            return self._epoll.fileno()
428
429        def register(self, fileobj, events, data=None):
430            key = super(EpollSelector, self).register(fileobj, events, data)
431            epoll_events = 0
432            if events & EVENT_READ:
433                epoll_events |= select.EPOLLIN
434            if events & EVENT_WRITE:
435                epoll_events |= select.EPOLLOUT
436            self._epoll.register(key.fd, epoll_events)
437            return key
438
439        def unregister(self, fileobj):
440            key = super(EpollSelector, self).unregister(fileobj)
441            try:
442                self._epoll.unregister(key.fd)
443            except IOError:
444                # This can happen if the FD was closed since it
445                # was registered.
446                pass
447            return key
448
449        def select(self, timeout=None):
450            if timeout is None:
451                timeout = -1
452            elif timeout <= 0:
453                timeout = 0
454            else:
455                # epoll_wait() has a resolution of 1 millisecond, round away
456                # from zero to wait *at least* timeout seconds.
457                timeout = math.ceil(timeout * 1e3) * 1e-3
458
459            # epoll_wait() expects `maxevents` to be greater than zero;
460            # we want to make sure that `select()` can be called when no
461            # FD is registered.
462            max_ev = max(len(self._fd_to_key), 1)
463
464            ready = []
465            try:
466                fd_event_list = self._epoll.poll(timeout, max_ev)
467            except IOError as exc:
468                if exc.errno == EINTR:
469                    return ready
470                else:
471                    raise
472            for fd, event in fd_event_list:
473                events = 0
474                if event & ~select.EPOLLIN:
475                    events |= EVENT_WRITE
476                if event & ~select.EPOLLOUT:
477                    events |= EVENT_READ
478
479                key = self._key_from_fd(fd)
480                if key:
481                    ready.append((key, events & key.events))
482            return ready
483
484        def close(self):
485            self._epoll.close()
486            super(EpollSelector, self).close()
487
488
489if hasattr(select, 'devpoll'):
490
491    class DevpollSelector(_BaseSelectorImpl):
492        """Solaris /dev/poll selector."""
493
494        def __init__(self):
495            super(DevpollSelector, self).__init__()
496            self._devpoll = select.devpoll()
497
498        def fileno(self):
499            return self._devpoll.fileno()
500
501        def register(self, fileobj, events, data=None):
502            key = super(DevpollSelector, self).register(fileobj, events, data)
503            poll_events = 0
504            if events & EVENT_READ:
505                poll_events |= select.POLLIN
506            if events & EVENT_WRITE:
507                poll_events |= select.POLLOUT
508            self._devpoll.register(key.fd, poll_events)
509            return key
510
511        def unregister(self, fileobj):
512            key = super(DevpollSelector, self).unregister(fileobj)
513            self._devpoll.unregister(key.fd)
514            return key
515
516        def select(self, timeout=None):
517            if timeout is None:
518                timeout = None
519            elif timeout <= 0:
520                timeout = 0
521            else:
522                # devpoll() has a resolution of 1 millisecond, round away from
523                # zero to wait *at least* timeout seconds.
524                timeout = math.ceil(timeout * 1e3)
525            ready = []
526            try:
527                fd_event_list = self._devpoll.poll(timeout)
528            except OSError as exc:
529                if exc.errno == EINTR:
530                    return ready
531                else:
532                    raise
533            for fd, event in fd_event_list:
534                events = 0
535                if event & ~select.POLLIN:
536                    events |= EVENT_WRITE
537                if event & ~select.POLLOUT:
538                    events |= EVENT_READ
539
540                key = self._key_from_fd(fd)
541                if key:
542                    ready.append((key, events & key.events))
543            return ready
544
545        def close(self):
546            self._devpoll.close()
547            super(DevpollSelector, self).close()
548
549
550if hasattr(select, 'kqueue'):
551
552    class KqueueSelector(_BaseSelectorImpl):
553        """Kqueue-based selector."""
554
555        def __init__(self):
556            super(KqueueSelector, self).__init__()
557            self._kqueue = select.kqueue()
558
559        def fileno(self):
560            return self._kqueue.fileno()
561
562        def register(self, fileobj, events, data=None):
563            key = super(KqueueSelector, self).register(fileobj, events, data)
564            if events & EVENT_READ:
565                kev = select.kevent(key.fd, select.KQ_FILTER_READ,
566                                    select.KQ_EV_ADD)
567                self._kqueue.control([kev], 0, 0)
568            if events & EVENT_WRITE:
569                kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
570                                    select.KQ_EV_ADD)
571                self._kqueue.control([kev], 0, 0)
572            return key
573
574        def unregister(self, fileobj):
575            key = super(KqueueSelector, self).unregister(fileobj)
576            if key.events & EVENT_READ:
577                kev = select.kevent(key.fd, select.KQ_FILTER_READ,
578                                    select.KQ_EV_DELETE)
579                try:
580                    self._kqueue.control([kev], 0, 0)
581                except OSError:
582                    # This can happen if the FD was closed since it
583                    # was registered.
584                    pass
585            if key.events & EVENT_WRITE:
586                kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
587                                    select.KQ_EV_DELETE)
588                try:
589                    self._kqueue.control([kev], 0, 0)
590                except OSError:
591                    # See comment above.
592                    pass
593            return key
594
595        def select(self, timeout=None):
596            timeout = None if timeout is None else max(timeout, 0)
597            max_ev = len(self._fd_to_key)
598            ready = []
599            try:
600                kev_list = self._kqueue.control(None, max_ev, timeout)
601            except OSError as exc:
602                if exc.errno == EINTR:
603                    return ready
604                else:
605                    raise
606            for kev in kev_list:
607                fd = kev.ident
608                flag = kev.filter
609                events = 0
610                if flag == select.KQ_FILTER_READ:
611                    events |= EVENT_READ
612                if flag == select.KQ_FILTER_WRITE:
613                    events |= EVENT_WRITE
614
615                key = self._key_from_fd(fd)
616                if key:
617                    ready.append((key, events & key.events))
618            return ready
619
620        def close(self):
621            self._kqueue.close()
622            super(KqueueSelector, self).close()
623
624
625# Choose the best implementation, roughly:
626#    epoll|kqueue|devpoll > poll > select.
627# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
628if 'KqueueSelector' in globals():
629    DefaultSelector = KqueueSelector
630elif 'EpollSelector' in globals():
631    DefaultSelector = EpollSelector
632elif 'DevpollSelector' in globals():
633    DefaultSelector = DevpollSelector
634elif 'PollSelector' in globals():
635    DefaultSelector = PollSelector
636else:
637    DefaultSelector = SelectSelector
638