1# Copyright (c) 2009-2011 Denis Bilenko. See LICENSE for details.
2"""
3Waiting for I/O completion.
4"""
5from __future__ import absolute_import, division, print_function
6
7import sys
8import select as __select__
9
10from gevent.event import Event
11from gevent.hub import _get_hub_noargs as get_hub
12from gevent.hub import sleep as _g_sleep
13from gevent._compat import integer_types
14from gevent._compat import iteritems
15from gevent._util import copy_globals
16from gevent._util import _NONE
17
18from errno import EINTR
19_real_original_select = __select__.select
20if sys.platform.startswith('win32'):
21    def _original_select(r, w, x, t):
22        # windows can't handle three empty lists, but we've always
23        # accepted that
24        if not r and not w and not x:
25            return ((), (), ())
26        return _real_original_select(r, w, x, t)
27else:
28    _original_select = _real_original_select
29
30# These will be replaced by copy_globals if they are defined by the
31# platform. They're not defined on Windows, but we still provide
32# poll() there. We only pay attention to POLLIN and POLLOUT.
33POLLIN = 1
34POLLPRI = 2
35POLLOUT = 4
36POLLERR = 8
37POLLHUP = 16
38POLLNVAL = 32
39
40POLLRDNORM = 64
41POLLRDBAND = 128
42POLLWRNORM = 4
43POLLWRBAND = 256
44
45__implements__ = [
46    'select',
47]
48if hasattr(__select__, 'poll'):
49    __implements__.append('poll')
50else:
51    __extra__ = [
52        'poll',
53    ]
54
55__all__ = ['error'] + __implements__
56
57error = __select__.error
58
59__imports__ = copy_globals(__select__, globals(),
60                           names_to_ignore=__all__,
61                           dunder_names_to_keep=())
62
63_EV_READ = 1
64_EV_WRITE = 2
65
66def get_fileno(obj):
67    try:
68        fileno_f = obj.fileno
69    except AttributeError:
70        if not isinstance(obj, integer_types):
71            raise TypeError('argument must be an int, or have a fileno() method: %r' % (obj,))
72        return obj
73    else:
74        return fileno_f()
75
76
77class SelectResult(object):
78    __slots__ = ()
79
80    @staticmethod
81    def _make_callback(ready_collection, event, mask):
82        def cb(fd, watcher):
83            ready_collection.append(fd)
84            watcher.close()
85            event.set()
86        cb.mask = mask
87        return cb
88
89    @classmethod
90    def _make_watchers(cls, watchers, *fd_cb):
91        loop = get_hub().loop
92        io = loop.io
93        MAXPRI = loop.MAXPRI
94
95        for fdlist, callback in fd_cb:
96            try:
97                for fd in fdlist:
98                    watcher = io(get_fileno(fd), callback.mask)
99                    watcher.priority = MAXPRI
100                    watchers.append(watcher)
101                    watcher.start(callback, fd, watcher)
102            except IOError as ex:
103                raise error(*ex.args)
104
105    @staticmethod
106    def _closeall(watchers):
107        for watcher in watchers:
108            watcher.stop()
109            watcher.close()
110        del watchers[:]
111
112    def select(self, rlist, wlist, timeout):
113        watchers = []
114        # read and write are the collected ready objects, accumulated
115        # by the callback. Note that we could get spurious callbacks
116        # if the socket is closed while we're blocked. We can't easily
117        # detect that (libev filters the events passed so we can't
118        # pass arbitrary events). After an iteration of polling for
119        # IO, libev will invoke all the pending IO watchers, and then
120        # any newly added (fed) events, and then we will invoke added
121        # callbacks. With libev 4.27+ and EV_VERIFY, it's critical to
122        # close our watcher immediately once we get an event. That
123        # could be the close event (coming just before the actual
124        # close happens), and once the FD is closed, libev will abort
125        # the process if we stop the watcher.
126        read = []
127        write = []
128        event = Event()
129        add_read = self._make_callback(read, event, _EV_READ)
130        add_write = self._make_callback(write, event, _EV_WRITE)
131
132        try:
133            self._make_watchers(watchers,
134                                (rlist, add_read),
135                                (wlist, add_write))
136            event.wait(timeout=timeout)
137            return read, write, []
138        finally:
139            self._closeall(watchers)
140
141
142def select(rlist, wlist, xlist, timeout=None): # pylint:disable=unused-argument
143    """An implementation of :meth:`select.select` that blocks only the current greenlet.
144
145    .. caution:: *xlist* is ignored.
146
147    .. versionchanged:: 1.2a1
148       Raise a :exc:`ValueError` if timeout is negative. This matches Python 3's
149       behaviour (Python 2 would raise a ``select.error``). Previously gevent had
150       undefined behaviour.
151    .. versionchanged:: 1.2a1
152       Raise an exception if any of the file descriptors are invalid.
153    """
154    if timeout is not None and timeout < 0:
155        # Raise an error like the real implementation; which error
156        # depends on the version. Python 3, where select.error is OSError,
157        # raises a ValueError (which makes sense). Older pythons raise
158        # the error from the select syscall...but we don't actually get there.
159        # We choose to just raise the ValueError as it makes more sense and is
160        # forward compatible
161        raise ValueError("timeout must be non-negative")
162
163    # First, do a poll with the original select system call. This is
164    # the most efficient way to check to see if any of the file
165    # descriptors have previously been closed and raise the correct
166    # corresponding exception. (Because libev tends to just return
167    # them as ready, or, if built with EV_VERIFY >= 2 and libev >=
168    # 4.27, crash the process. And libuv also tends to crash the
169    # process.)
170    #
171    # We accept the *xlist* here even though we can't
172    # below because this is all about error handling.
173    sel_results = ((), (), ())
174    try:
175        sel_results = _original_select(rlist, wlist, xlist, 0)
176    except error as e:
177        enumber = getattr(e, 'errno', None) or e.args[0]
178        if enumber != EINTR:
179            # Ignore interrupted syscalls
180            raise
181
182    if sel_results[0] or sel_results[1] or sel_results[2] or (timeout is not None and timeout == 0):
183        # If we actually had stuff ready, go ahead and return it. No need
184        # to go through the trouble of doing our own stuff.
185
186        # Likewise, if the timeout is 0, we already did a 0 timeout
187        # select and we don't need to do it again. Note that in libuv,
188        # zero duration timers may be called immediately, without
189        # cycling the event loop at all. 2.7/test_telnetlib.py "hangs"
190        # calling zero-duration timers if we go to the loop here.
191
192        # However, because this is typically a place where scheduling switches
193        # can occur, we need to make sure that's still the case; otherwise a single
194        # consumer could monopolize the thread. (shows up in test_ftplib.)
195        _g_sleep()
196        return sel_results
197
198    result = SelectResult()
199    return result.select(rlist, wlist, timeout)
200
201
202
203class PollResult(object):
204    __slots__ = ('events', 'event')
205
206    def __init__(self):
207        self.events = set()
208        self.event = Event()
209
210    def add_event(self, events, fd):
211        if events < 0:
212            result_flags = POLLNVAL
213        else:
214            result_flags = 0
215            if events & _EV_READ:
216                result_flags = POLLIN
217            if events & _EV_WRITE:
218                result_flags |= POLLOUT
219
220        self.events.add((fd, result_flags))
221        self.event.set()
222
223class poll(object):
224    """
225    An implementation of :class:`select.poll` that blocks only the current greenlet.
226
227    .. caution:: ``POLLPRI`` data is not supported.
228
229    .. versionadded:: 1.1b1
230    .. versionchanged:: 1.5
231       This is now always defined, regardless of whether the standard library
232       defines :func:`select.poll` or not. Note that it may have different performance
233       characteristics.
234    """
235    def __init__(self):
236        # {int -> flags}
237        # We can't keep watcher objects in here because people commonly
238        # just drop the poll object when they're done, without calling
239        # unregister(). dnspython does this.
240        self.fds = {}
241        self.loop = get_hub().loop
242
243    def register(self, fd, eventmask=_NONE):
244        if eventmask is _NONE:
245            flags = _EV_READ | _EV_WRITE
246        else:
247            flags = 0
248            if eventmask & POLLIN:
249                flags = _EV_READ
250            if eventmask & POLLOUT:
251                flags |= _EV_WRITE
252            # If they ask for POLLPRI, we can't support
253            # that. Should we raise an error?
254
255        fileno = get_fileno(fd)
256        self.fds[fileno] = flags
257
258    def modify(self, fd, eventmask):
259        self.register(fd, eventmask)
260
261    def _get_started_watchers(self, watcher_cb):
262        watchers = []
263        io = self.loop.io
264        MAXPRI = self.loop.MAXPRI
265
266        try:
267            for fd, flags in iteritems(self.fds):
268                watcher = io(fd, flags)
269                watchers.append(watcher)
270                watcher.priority = MAXPRI
271                watcher.start(watcher_cb, fd, pass_events=True)
272        except:
273            for awatcher in watchers:
274                awatcher.stop()
275                awatcher.close()
276            raise
277        return watchers
278
279
280    def poll(self, timeout=None):
281        """
282        poll the registered fds.
283
284        .. versionchanged:: 1.2a1
285           File descriptors that are closed are reported with POLLNVAL.
286
287        .. versionchanged:: 1.3a2
288           Under libuv, interpret *timeout* values less than 0 the same as *None*,
289           i.e., block. This was always the case with libev.
290        """
291        result = PollResult()
292        watchers = self._get_started_watchers(result.add_event)
293        try:
294            if timeout is not None:
295                if timeout < 0:
296                    # The docs for python say that an omitted timeout,
297                    # a negative timeout and a timeout of None are all
298                    # supposed to block forever. Many, but not all
299                    # OS's accept any negative number to mean that. Some
300                    # OS's raise errors for anything negative but not -1.
301                    # Python 3.7 changes to always pass exactly -1 in that
302                    # case from selectors.
303
304                    # Our Timeout class currently does not have a defined behaviour
305                    # for negative values. On libuv, it uses a check watcher and effectively
306                    # doesn't block. On libev, it seems to block. In either case, we
307                    # *want* to block, so turn this into the sure fire block request.
308                    timeout = None
309                elif timeout:
310                    # The docs for poll.poll say timeout is in
311                    # milliseconds. Our result objects work in
312                    # seconds, so this should be *=, shouldn't it?
313                    timeout /= 1000.0
314            result.event.wait(timeout=timeout)
315            return list(result.events)
316        finally:
317            for awatcher in watchers:
318                awatcher.stop()
319                awatcher.close()
320
321    def unregister(self, fd):
322        """
323        Unregister the *fd*.
324
325        .. versionchanged:: 1.2a1
326           Raise a `KeyError` if *fd* was not registered, like the standard
327           library. Previously gevent did nothing.
328        """
329        fileno = get_fileno(fd)
330        del self.fds[fileno]
331
332
333def _gevent_do_monkey_patch(patch_request):
334    aggressive = patch_request.patch_kwargs['aggressive']
335
336    patch_request.default_patch_items()
337
338    if aggressive:
339        # since these are blocking we're removing them here. This makes some other
340        # modules (e.g. asyncore)  non-blocking, as they use select that we provide
341        # when none of these are available.
342        patch_request.remove_item(
343            'epoll',
344            'kqueue',
345            'kevent',
346            'devpoll',
347        )
348