1# Backport of selectors.py from Python 3.5+ to support Python < 3.4 2# Also has the behavior specified in PEP 475 which is to retry syscalls 3# in the case of an EINTR error. This module is required because selectors34 4# does not follow this behavior and instead returns that no dile descriptor 5# events have occurred rather than retry the syscall. The decision to drop 6# support for select.devpoll is made to maintain 100% test coverage. 7 8import errno 9import math 10import select 11from collections import namedtuple, Mapping 12 13import time 14try: 15 monotonic = time.monotonic 16except (AttributeError, ImportError): # Python 3.3< 17 monotonic = time.time 18 19EVENT_READ = (1 << 0) 20EVENT_WRITE = (1 << 1) 21 22HAS_SELECT = True # Variable that shows whether the platform has a selector. 23_SYSCALL_SENTINEL = object() # Sentinel in case a system call returns None. 24 25 26class SelectorError(Exception): 27 def __init__(self, errcode): 28 super(SelectorError, self).__init__() 29 self.errno = errcode 30 31 def __repr__(self): 32 return "<SelectorError errno={0}>".format(self.errno) 33 34 def __str__(self): 35 return self.__repr__() 36 37 38def _fileobj_to_fd(fileobj): 39 """ Return a file descriptor from a file object. If 40 given an integer will simply return that integer back. """ 41 if isinstance(fileobj, int): 42 fd = fileobj 43 else: 44 try: 45 fd = int(fileobj.fileno()) 46 except (AttributeError, TypeError, ValueError): 47 raise ValueError("Invalid file object: {0!r}".format(fileobj)) 48 if fd < 0: 49 raise ValueError("Invalid file descriptor: {0}".format(fd)) 50 return fd 51 52 53def _syscall_wrapper(func, recalc_timeout, *args, **kwargs): 54 """ Wrapper function for syscalls that could fail due to EINTR. 55 All functions should be retried if there is time left in the timeout 56 in accordance with PEP 475. """ 57 timeout = kwargs.get("timeout", None) 58 if timeout is None: 59 expires = None 60 recalc_timeout = False 61 else: 62 timeout = float(timeout) 63 if timeout < 0.0: # Timeout less than 0 treated as no timeout. 64 expires = None 65 else: 66 expires = monotonic() + timeout 67 68 args = list(args) 69 if recalc_timeout and "timeout" not in kwargs: 70 raise ValueError( 71 "Timeout must be in args or kwargs to be recalculated") 72 73 result = _SYSCALL_SENTINEL 74 while result is _SYSCALL_SENTINEL: 75 try: 76 result = func(*args, **kwargs) 77 # OSError is thrown by select.select 78 # IOError is thrown by select.epoll.poll 79 # select.error is thrown by select.poll.poll 80 # Aren't we thankful for Python 3.x rework for exceptions? 81 except (OSError, IOError, select.error) as e: 82 # select.error wasn't a subclass of OSError in the past. 83 errcode = None 84 if hasattr(e, "errno"): 85 errcode = e.errno 86 elif hasattr(e, "args"): 87 errcode = e.args[0] 88 89 # Also test for the Windows equivalent of EINTR. 90 is_interrupt = (errcode == errno.EINTR or (hasattr(errno, "WSAEINTR") and 91 errcode == errno.WSAEINTR)) 92 93 if is_interrupt: 94 if expires is not None: 95 current_time = monotonic() 96 if current_time > expires: 97 raise OSError(errno=errno.ETIMEDOUT) 98 if recalc_timeout: 99 if "timeout" in kwargs: 100 kwargs["timeout"] = expires - current_time 101 continue 102 if errcode: 103 raise SelectorError(errcode) 104 else: 105 raise 106 return result 107 108 109SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']) 110 111 112class _SelectorMapping(Mapping): 113 """ Mapping of file objects to selector keys """ 114 115 def __init__(self, selector): 116 self._selector = selector 117 118 def __len__(self): 119 return len(self._selector._fd_to_key) 120 121 def __getitem__(self, fileobj): 122 try: 123 fd = self._selector._fileobj_lookup(fileobj) 124 return self._selector._fd_to_key[fd] 125 except KeyError: 126 raise KeyError("{0!r} is not registered.".format(fileobj)) 127 128 def __iter__(self): 129 return iter(self._selector._fd_to_key) 130 131 132class BaseSelector(object): 133 """ Abstract Selector class 134 135 A selector supports registering file objects to be monitored 136 for specific I/O events. 137 138 A file object is a file descriptor or any object with a 139 `fileno()` method. An arbitrary object can be attached to the 140 file object which can be used for example to store context info, 141 a callback, etc. 142 143 A selector can use various implementations (select(), poll(), epoll(), 144 and kqueue()) depending on the platform. The 'DefaultSelector' class uses 145 the most efficient implementation for the current platform. 146 """ 147 def __init__(self): 148 # Maps file descriptors to keys. 149 self._fd_to_key = {} 150 151 # Read-only mapping returned by get_map() 152 self._map = _SelectorMapping(self) 153 154 def _fileobj_lookup(self, fileobj): 155 """ Return a file descriptor from a file object. 156 This wraps _fileobj_to_fd() to do an exhaustive 157 search in case the object is invalid but we still 158 have it in our map. Used by unregister() so we can 159 unregister an object that was previously registered 160 even if it is closed. It is also used by _SelectorMapping 161 """ 162 try: 163 return _fileobj_to_fd(fileobj) 164 except ValueError: 165 166 # Search through all our mapped keys. 167 for key in self._fd_to_key.values(): 168 if key.fileobj is fileobj: 169 return key.fd 170 171 # Raise ValueError after all. 172 raise 173 174 def register(self, fileobj, events, data=None): 175 """ Register a file object for a set of events to monitor. """ 176 if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): 177 raise ValueError("Invalid events: {0!r}".format(events)) 178 179 key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data) 180 181 if key.fd in self._fd_to_key: 182 raise KeyError("{0!r} (FD {1}) is already registered" 183 .format(fileobj, key.fd)) 184 185 self._fd_to_key[key.fd] = key 186 return key 187 188 def unregister(self, fileobj): 189 """ Unregister a file object from being monitored. """ 190 try: 191 key = self._fd_to_key.pop(self._fileobj_lookup(fileobj)) 192 except KeyError: 193 raise KeyError("{0!r} is not registered".format(fileobj)) 194 return key 195 196 def modify(self, fileobj, events, data=None): 197 """ Change a registered file object monitored events and data. """ 198 # NOTE: Some subclasses optimize this operation even further. 199 try: 200 key = self._fd_to_key[self._fileobj_lookup(fileobj)] 201 except KeyError: 202 raise KeyError("{0!r} is not registered".format(fileobj)) 203 204 if events != key.events: 205 self.unregister(fileobj) 206 key = self.register(fileobj, events, data) 207 208 elif data != key.data: 209 # Use a shortcut to update the data. 210 key = key._replace(data=data) 211 self._fd_to_key[key.fd] = key 212 213 return key 214 215 def select(self, timeout=None): 216 """ Perform the actual selection until some monitored file objects 217 are ready or the timeout expires. """ 218 raise NotImplementedError() 219 220 def close(self): 221 """ Close the selector. This must be called to ensure that all 222 underlying resources are freed. """ 223 self._fd_to_key.clear() 224 self._map = None 225 226 def get_key(self, fileobj): 227 """ Return the key associated with a registered file object. """ 228 mapping = self.get_map() 229 if mapping is None: 230 raise RuntimeError("Selector is closed") 231 try: 232 return mapping[fileobj] 233 except KeyError: 234 raise KeyError("{0!r} is not registered".format(fileobj)) 235 236 def get_map(self): 237 """ Return a mapping of file objects to selector keys """ 238 return self._map 239 240 def _key_from_fd(self, fd): 241 """ Return the key associated to a given file descriptor 242 Return None if it is not found. """ 243 try: 244 return self._fd_to_key[fd] 245 except KeyError: 246 return None 247 248 def __enter__(self): 249 return self 250 251 def __exit__(self, *args): 252 self.close() 253 254 255# Almost all platforms have select.select() 256if hasattr(select, "select"): 257 class SelectSelector(BaseSelector): 258 """ Select-based selector. """ 259 def __init__(self): 260 super(SelectSelector, self).__init__() 261 self._readers = set() 262 self._writers = set() 263 264 def register(self, fileobj, events, data=None): 265 key = super(SelectSelector, self).register(fileobj, events, data) 266 if events & EVENT_READ: 267 self._readers.add(key.fd) 268 if events & EVENT_WRITE: 269 self._writers.add(key.fd) 270 return key 271 272 def unregister(self, fileobj): 273 key = super(SelectSelector, self).unregister(fileobj) 274 self._readers.discard(key.fd) 275 self._writers.discard(key.fd) 276 return key 277 278 def _select(self, r, w, timeout=None): 279 """ Wrapper for select.select because timeout is a positional arg """ 280 return select.select(r, w, [], timeout) 281 282 def select(self, timeout=None): 283 # Selecting on empty lists on Windows errors out. 284 if not len(self._readers) and not len(self._writers): 285 return [] 286 287 timeout = None if timeout is None else max(timeout, 0.0) 288 ready = [] 289 r, w, _ = _syscall_wrapper(self._select, True, self._readers, 290 self._writers, timeout) 291 r = set(r) 292 w = set(w) 293 for fd in r | w: 294 events = 0 295 if fd in r: 296 events |= EVENT_READ 297 if fd in w: 298 events |= EVENT_WRITE 299 300 key = self._key_from_fd(fd) 301 if key: 302 ready.append((key, events & key.events)) 303 return ready 304 305 306if hasattr(select, "poll"): 307 class PollSelector(BaseSelector): 308 """ Poll-based selector """ 309 def __init__(self): 310 super(PollSelector, self).__init__() 311 self._poll = select.poll() 312 313 def register(self, fileobj, events, data=None): 314 key = super(PollSelector, self).register(fileobj, events, data) 315 event_mask = 0 316 if events & EVENT_READ: 317 event_mask |= select.POLLIN 318 if events & EVENT_WRITE: 319 event_mask |= select.POLLOUT 320 self._poll.register(key.fd, event_mask) 321 return key 322 323 def unregister(self, fileobj): 324 key = super(PollSelector, self).unregister(fileobj) 325 self._poll.unregister(key.fd) 326 return key 327 328 def _wrap_poll(self, timeout=None): 329 """ Wrapper function for select.poll.poll() so that 330 _syscall_wrapper can work with only seconds. """ 331 if timeout is not None: 332 if timeout <= 0: 333 timeout = 0 334 else: 335 # select.poll.poll() has a resolution of 1 millisecond, 336 # round away from zero to wait *at least* timeout seconds. 337 timeout = math.ceil(timeout * 1e3) 338 339 result = self._poll.poll(timeout) 340 return result 341 342 def select(self, timeout=None): 343 ready = [] 344 fd_events = _syscall_wrapper(self._wrap_poll, True, timeout=timeout) 345 for fd, event_mask in fd_events: 346 events = 0 347 if event_mask & ~select.POLLIN: 348 events |= EVENT_WRITE 349 if event_mask & ~select.POLLOUT: 350 events |= EVENT_READ 351 352 key = self._key_from_fd(fd) 353 if key: 354 ready.append((key, events & key.events)) 355 356 return ready 357 358 359if hasattr(select, "epoll"): 360 class EpollSelector(BaseSelector): 361 """ Epoll-based selector """ 362 def __init__(self): 363 super(EpollSelector, self).__init__() 364 self._epoll = select.epoll() 365 366 def fileno(self): 367 return self._epoll.fileno() 368 369 def register(self, fileobj, events, data=None): 370 key = super(EpollSelector, self).register(fileobj, events, data) 371 events_mask = 0 372 if events & EVENT_READ: 373 events_mask |= select.EPOLLIN 374 if events & EVENT_WRITE: 375 events_mask |= select.EPOLLOUT 376 _syscall_wrapper(self._epoll.register, False, key.fd, events_mask) 377 return key 378 379 def unregister(self, fileobj): 380 key = super(EpollSelector, self).unregister(fileobj) 381 try: 382 _syscall_wrapper(self._epoll.unregister, False, key.fd) 383 except SelectorError: 384 # This can occur when the fd was closed since registry. 385 pass 386 return key 387 388 def select(self, timeout=None): 389 if timeout is not None: 390 if timeout <= 0: 391 timeout = 0.0 392 else: 393 # select.epoll.poll() has a resolution of 1 millisecond 394 # but luckily takes seconds so we don't need a wrapper 395 # like PollSelector. Just for better rounding. 396 timeout = math.ceil(timeout * 1e3) * 1e-3 397 timeout = float(timeout) 398 else: 399 timeout = -1.0 # epoll.poll() must have a float. 400 401 # We always want at least 1 to ensure that select can be called 402 # with no file descriptors registered. Otherwise will fail. 403 max_events = max(len(self._fd_to_key), 1) 404 405 ready = [] 406 fd_events = _syscall_wrapper(self._epoll.poll, True, 407 timeout=timeout, 408 maxevents=max_events) 409 for fd, event_mask in fd_events: 410 events = 0 411 if event_mask & ~select.EPOLLIN: 412 events |= EVENT_WRITE 413 if event_mask & ~select.EPOLLOUT: 414 events |= EVENT_READ 415 416 key = self._key_from_fd(fd) 417 if key: 418 ready.append((key, events & key.events)) 419 return ready 420 421 def close(self): 422 self._epoll.close() 423 super(EpollSelector, self).close() 424 425 426if hasattr(select, "kqueue"): 427 class KqueueSelector(BaseSelector): 428 """ Kqueue / Kevent-based selector """ 429 def __init__(self): 430 super(KqueueSelector, self).__init__() 431 self._kqueue = select.kqueue() 432 433 def fileno(self): 434 return self._kqueue.fileno() 435 436 def register(self, fileobj, events, data=None): 437 key = super(KqueueSelector, self).register(fileobj, events, data) 438 if events & EVENT_READ: 439 kevent = select.kevent(key.fd, 440 select.KQ_FILTER_READ, 441 select.KQ_EV_ADD) 442 443 _syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0) 444 445 if events & EVENT_WRITE: 446 kevent = select.kevent(key.fd, 447 select.KQ_FILTER_WRITE, 448 select.KQ_EV_ADD) 449 450 _syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0) 451 452 return key 453 454 def unregister(self, fileobj): 455 key = super(KqueueSelector, self).unregister(fileobj) 456 if key.events & EVENT_READ: 457 kevent = select.kevent(key.fd, 458 select.KQ_FILTER_READ, 459 select.KQ_EV_DELETE) 460 try: 461 _syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0) 462 except SelectorError: 463 pass 464 if key.events & EVENT_WRITE: 465 kevent = select.kevent(key.fd, 466 select.KQ_FILTER_WRITE, 467 select.KQ_EV_DELETE) 468 try: 469 _syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0) 470 except SelectorError: 471 pass 472 473 return key 474 475 def select(self, timeout=None): 476 if timeout is not None: 477 timeout = max(timeout, 0) 478 479 max_events = len(self._fd_to_key) * 2 480 ready_fds = {} 481 482 kevent_list = _syscall_wrapper(self._kqueue.control, True, 483 None, max_events, timeout) 484 485 for kevent in kevent_list: 486 fd = kevent.ident 487 event_mask = kevent.filter 488 events = 0 489 if event_mask == select.KQ_FILTER_READ: 490 events |= EVENT_READ 491 if event_mask == select.KQ_FILTER_WRITE: 492 events |= EVENT_WRITE 493 494 key = self._key_from_fd(fd) 495 if key: 496 if key.fd not in ready_fds: 497 ready_fds[key.fd] = (key, events & key.events) 498 else: 499 old_events = ready_fds[key.fd][1] 500 ready_fds[key.fd] = (key, (events | old_events) & key.events) 501 502 return list(ready_fds.values()) 503 504 def close(self): 505 self._kqueue.close() 506 super(KqueueSelector, self).close() 507 508 509# Choose the best implementation, roughly: 510# kqueue == epoll > poll > select. Devpoll not supported. (See above) 511# select() also can't accept a FD > FD_SETSIZE (usually around 1024) 512if 'KqueueSelector' in globals(): # Platform-specific: Mac OS and BSD 513 DefaultSelector = KqueueSelector 514elif 'EpollSelector' in globals(): # Platform-specific: Linux 515 DefaultSelector = EpollSelector 516elif 'PollSelector' in globals(): # Platform-specific: Linux 517 DefaultSelector = PollSelector 518elif 'SelectSelector' in globals(): # Platform-specific: Windows 519 DefaultSelector = SelectSelector 520else: # Platform-specific: AppEngine 521 def no_selector(_): 522 raise ValueError("Platform does not have a selector") 523 DefaultSelector = no_selector 524 HAS_SELECT = False 525