1# Copyright (c) 2009-2011 Denis Bilenko. See LICENSE for details. 2""" 3Waiting for I/O completion. 4""" 5from __future__ import absolute_import, division, print_function 6 7import sys 8import select as __select__ 9 10from gevent.event import Event 11from gevent.hub import _get_hub_noargs as get_hub 12from gevent.hub import sleep as _g_sleep 13from gevent._compat import integer_types 14from gevent._compat import iteritems 15from gevent._util import copy_globals 16from gevent._util import _NONE 17 18from errno import EINTR 19_real_original_select = __select__.select 20if sys.platform.startswith('win32'): 21 def _original_select(r, w, x, t): 22 # windows can't handle three empty lists, but we've always 23 # accepted that 24 if not r and not w and not x: 25 return ((), (), ()) 26 return _real_original_select(r, w, x, t) 27else: 28 _original_select = _real_original_select 29 30# These will be replaced by copy_globals if they are defined by the 31# platform. They're not defined on Windows, but we still provide 32# poll() there. We only pay attention to POLLIN and POLLOUT. 33POLLIN = 1 34POLLPRI = 2 35POLLOUT = 4 36POLLERR = 8 37POLLHUP = 16 38POLLNVAL = 32 39 40POLLRDNORM = 64 41POLLRDBAND = 128 42POLLWRNORM = 4 43POLLWRBAND = 256 44 45__implements__ = [ 46 'select', 47] 48if hasattr(__select__, 'poll'): 49 __implements__.append('poll') 50else: 51 __extra__ = [ 52 'poll', 53 ] 54 55__all__ = ['error'] + __implements__ 56 57error = __select__.error 58 59__imports__ = copy_globals(__select__, globals(), 60 names_to_ignore=__all__, 61 dunder_names_to_keep=()) 62 63_EV_READ = 1 64_EV_WRITE = 2 65 66def get_fileno(obj): 67 try: 68 fileno_f = obj.fileno 69 except AttributeError: 70 if not isinstance(obj, integer_types): 71 raise TypeError('argument must be an int, or have a fileno() method: %r' % (obj,)) 72 return obj 73 else: 74 return fileno_f() 75 76 77class SelectResult(object): 78 __slots__ = () 79 80 @staticmethod 81 def _make_callback(ready_collection, event, mask): 82 def cb(fd, watcher): 83 ready_collection.append(fd) 84 watcher.close() 85 event.set() 86 cb.mask = mask 87 return cb 88 89 @classmethod 90 def _make_watchers(cls, watchers, *fd_cb): 91 loop = get_hub().loop 92 io = loop.io 93 MAXPRI = loop.MAXPRI 94 95 for fdlist, callback in fd_cb: 96 try: 97 for fd in fdlist: 98 watcher = io(get_fileno(fd), callback.mask) 99 watcher.priority = MAXPRI 100 watchers.append(watcher) 101 watcher.start(callback, fd, watcher) 102 except IOError as ex: 103 raise error(*ex.args) 104 105 @staticmethod 106 def _closeall(watchers): 107 for watcher in watchers: 108 watcher.stop() 109 watcher.close() 110 del watchers[:] 111 112 def select(self, rlist, wlist, timeout): 113 watchers = [] 114 # read and write are the collected ready objects, accumulated 115 # by the callback. Note that we could get spurious callbacks 116 # if the socket is closed while we're blocked. We can't easily 117 # detect that (libev filters the events passed so we can't 118 # pass arbitrary events). After an iteration of polling for 119 # IO, libev will invoke all the pending IO watchers, and then 120 # any newly added (fed) events, and then we will invoke added 121 # callbacks. With libev 4.27+ and EV_VERIFY, it's critical to 122 # close our watcher immediately once we get an event. That 123 # could be the close event (coming just before the actual 124 # close happens), and once the FD is closed, libev will abort 125 # the process if we stop the watcher. 126 read = [] 127 write = [] 128 event = Event() 129 add_read = self._make_callback(read, event, _EV_READ) 130 add_write = self._make_callback(write, event, _EV_WRITE) 131 132 try: 133 self._make_watchers(watchers, 134 (rlist, add_read), 135 (wlist, add_write)) 136 event.wait(timeout=timeout) 137 return read, write, [] 138 finally: 139 self._closeall(watchers) 140 141 142def select(rlist, wlist, xlist, timeout=None): # pylint:disable=unused-argument 143 """An implementation of :meth:`select.select` that blocks only the current greenlet. 144 145 .. caution:: *xlist* is ignored. 146 147 .. versionchanged:: 1.2a1 148 Raise a :exc:`ValueError` if timeout is negative. This matches Python 3's 149 behaviour (Python 2 would raise a ``select.error``). Previously gevent had 150 undefined behaviour. 151 .. versionchanged:: 1.2a1 152 Raise an exception if any of the file descriptors are invalid. 153 """ 154 if timeout is not None and timeout < 0: 155 # Raise an error like the real implementation; which error 156 # depends on the version. Python 3, where select.error is OSError, 157 # raises a ValueError (which makes sense). Older pythons raise 158 # the error from the select syscall...but we don't actually get there. 159 # We choose to just raise the ValueError as it makes more sense and is 160 # forward compatible 161 raise ValueError("timeout must be non-negative") 162 163 # First, do a poll with the original select system call. This is 164 # the most efficient way to check to see if any of the file 165 # descriptors have previously been closed and raise the correct 166 # corresponding exception. (Because libev tends to just return 167 # them as ready, or, if built with EV_VERIFY >= 2 and libev >= 168 # 4.27, crash the process. And libuv also tends to crash the 169 # process.) 170 # 171 # We accept the *xlist* here even though we can't 172 # below because this is all about error handling. 173 sel_results = ((), (), ()) 174 try: 175 sel_results = _original_select(rlist, wlist, xlist, 0) 176 except error as e: 177 enumber = getattr(e, 'errno', None) or e.args[0] 178 if enumber != EINTR: 179 # Ignore interrupted syscalls 180 raise 181 182 if sel_results[0] or sel_results[1] or sel_results[2] or (timeout is not None and timeout == 0): 183 # If we actually had stuff ready, go ahead and return it. No need 184 # to go through the trouble of doing our own stuff. 185 186 # Likewise, if the timeout is 0, we already did a 0 timeout 187 # select and we don't need to do it again. Note that in libuv, 188 # zero duration timers may be called immediately, without 189 # cycling the event loop at all. 2.7/test_telnetlib.py "hangs" 190 # calling zero-duration timers if we go to the loop here. 191 192 # However, because this is typically a place where scheduling switches 193 # can occur, we need to make sure that's still the case; otherwise a single 194 # consumer could monopolize the thread. (shows up in test_ftplib.) 195 _g_sleep() 196 return sel_results 197 198 result = SelectResult() 199 return result.select(rlist, wlist, timeout) 200 201 202 203class PollResult(object): 204 __slots__ = ('events', 'event') 205 206 def __init__(self): 207 self.events = set() 208 self.event = Event() 209 210 def add_event(self, events, fd): 211 if events < 0: 212 result_flags = POLLNVAL 213 else: 214 result_flags = 0 215 if events & _EV_READ: 216 result_flags = POLLIN 217 if events & _EV_WRITE: 218 result_flags |= POLLOUT 219 220 self.events.add((fd, result_flags)) 221 self.event.set() 222 223class poll(object): 224 """ 225 An implementation of :class:`select.poll` that blocks only the current greenlet. 226 227 .. caution:: ``POLLPRI`` data is not supported. 228 229 .. versionadded:: 1.1b1 230 .. versionchanged:: 1.5 231 This is now always defined, regardless of whether the standard library 232 defines :func:`select.poll` or not. Note that it may have different performance 233 characteristics. 234 """ 235 def __init__(self): 236 # {int -> flags} 237 # We can't keep watcher objects in here because people commonly 238 # just drop the poll object when they're done, without calling 239 # unregister(). dnspython does this. 240 self.fds = {} 241 self.loop = get_hub().loop 242 243 def register(self, fd, eventmask=_NONE): 244 if eventmask is _NONE: 245 flags = _EV_READ | _EV_WRITE 246 else: 247 flags = 0 248 if eventmask & POLLIN: 249 flags = _EV_READ 250 if eventmask & POLLOUT: 251 flags |= _EV_WRITE 252 # If they ask for POLLPRI, we can't support 253 # that. Should we raise an error? 254 255 fileno = get_fileno(fd) 256 self.fds[fileno] = flags 257 258 def modify(self, fd, eventmask): 259 self.register(fd, eventmask) 260 261 def _get_started_watchers(self, watcher_cb): 262 watchers = [] 263 io = self.loop.io 264 MAXPRI = self.loop.MAXPRI 265 266 try: 267 for fd, flags in iteritems(self.fds): 268 watcher = io(fd, flags) 269 watchers.append(watcher) 270 watcher.priority = MAXPRI 271 watcher.start(watcher_cb, fd, pass_events=True) 272 except: 273 for awatcher in watchers: 274 awatcher.stop() 275 awatcher.close() 276 raise 277 return watchers 278 279 280 def poll(self, timeout=None): 281 """ 282 poll the registered fds. 283 284 .. versionchanged:: 1.2a1 285 File descriptors that are closed are reported with POLLNVAL. 286 287 .. versionchanged:: 1.3a2 288 Under libuv, interpret *timeout* values less than 0 the same as *None*, 289 i.e., block. This was always the case with libev. 290 """ 291 result = PollResult() 292 watchers = self._get_started_watchers(result.add_event) 293 try: 294 if timeout is not None: 295 if timeout < 0: 296 # The docs for python say that an omitted timeout, 297 # a negative timeout and a timeout of None are all 298 # supposed to block forever. Many, but not all 299 # OS's accept any negative number to mean that. Some 300 # OS's raise errors for anything negative but not -1. 301 # Python 3.7 changes to always pass exactly -1 in that 302 # case from selectors. 303 304 # Our Timeout class currently does not have a defined behaviour 305 # for negative values. On libuv, it uses a check watcher and effectively 306 # doesn't block. On libev, it seems to block. In either case, we 307 # *want* to block, so turn this into the sure fire block request. 308 timeout = None 309 elif timeout: 310 # The docs for poll.poll say timeout is in 311 # milliseconds. Our result objects work in 312 # seconds, so this should be *=, shouldn't it? 313 timeout /= 1000.0 314 result.event.wait(timeout=timeout) 315 return list(result.events) 316 finally: 317 for awatcher in watchers: 318 awatcher.stop() 319 awatcher.close() 320 321 def unregister(self, fd): 322 """ 323 Unregister the *fd*. 324 325 .. versionchanged:: 1.2a1 326 Raise a `KeyError` if *fd* was not registered, like the standard 327 library. Previously gevent did nothing. 328 """ 329 fileno = get_fileno(fd) 330 del self.fds[fileno] 331 332 333def _gevent_do_monkey_patch(patch_request): 334 aggressive = patch_request.patch_kwargs['aggressive'] 335 336 patch_request.default_patch_items() 337 338 if aggressive: 339 # since these are blocking we're removing them here. This makes some other 340 # modules (e.g. asyncore) non-blocking, as they use select that we provide 341 # when none of these are available. 342 patch_request.remove_item( 343 'epoll', 344 'kqueue', 345 'kevent', 346 'devpoll', 347 ) 348