1# pylint: skip-file 2# vendored from https://github.com/berkerpeksag/selectors34 3# at commit ff61b82168d2cc9c4922ae08e2a8bf94aab61ea2 (unreleased, ~1.2) 4# 5# Original author: Charles-Francois Natali (c.f.natali[at]gmail.com) 6# Maintainer: Berker Peksag (berker.peksag[at]gmail.com) 7# Also see https://pypi.python.org/pypi/selectors34 8"""Selectors module. 9 10This module allows high-level and efficient I/O multiplexing, built upon the 11`select` module primitives. 12 13The following code adapted from trollius.selectors. 14""" 15from __future__ import absolute_import 16 17from abc import ABCMeta, abstractmethod 18from collections import namedtuple, Mapping 19from errno import EINTR 20import math 21import select 22import sys 23 24from kafka.vendor import six 25 26 27def _wrap_error(exc, mapping, key): 28 if key not in mapping: 29 return 30 new_err_cls = mapping[key] 31 new_err = new_err_cls(*exc.args) 32 33 # raise a new exception with the original traceback 34 if hasattr(exc, '__traceback__'): 35 traceback = exc.__traceback__ 36 else: 37 traceback = sys.exc_info()[2] 38 six.reraise(new_err_cls, new_err, traceback) 39 40 41# generic events, that must be mapped to implementation-specific ones 42EVENT_READ = (1 << 0) 43EVENT_WRITE = (1 << 1) 44 45 46def _fileobj_to_fd(fileobj): 47 """Return a file descriptor from a file object. 48 49 Parameters: 50 fileobj -- file object or file descriptor 51 52 Returns: 53 corresponding file descriptor 54 55 Raises: 56 ValueError if the object is invalid 57 """ 58 if isinstance(fileobj, six.integer_types): 59 fd = fileobj 60 else: 61 try: 62 fd = int(fileobj.fileno()) 63 except (AttributeError, TypeError, ValueError): 64 raise ValueError("Invalid file object: " 65 "{0!r}".format(fileobj)) 66 if fd < 0: 67 raise ValueError("Invalid file descriptor: {0}".format(fd)) 68 return fd 69 70 71SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']) 72"""Object used to associate a file object to its backing file descriptor, 73selected event mask and attached data.""" 74 75 76class _SelectorMapping(Mapping): 77 """Mapping of file objects to selector keys.""" 78 79 def __init__(self, selector): 80 self._selector = selector 81 82 def __len__(self): 83 return len(self._selector._fd_to_key) 84 85 def __getitem__(self, fileobj): 86 try: 87 fd = self._selector._fileobj_lookup(fileobj) 88 return self._selector._fd_to_key[fd] 89 except KeyError: 90 raise KeyError("{0!r} is not registered".format(fileobj)) 91 92 def __iter__(self): 93 return iter(self._selector._fd_to_key) 94 95# Using six.add_metaclass() decorator instead of six.with_metaclass() because 96# the latter leaks temporary_class to garbage with gc disabled 97@six.add_metaclass(ABCMeta) 98class BaseSelector(object): 99 """Selector abstract base class. 100 101 A selector supports registering file objects to be monitored for specific 102 I/O events. 103 104 A file object is a file descriptor or any object with a `fileno()` method. 105 An arbitrary object can be attached to the file object, which can be used 106 for example to store context information, a callback, etc. 107 108 A selector can use various implementations (select(), poll(), epoll()...) 109 depending on the platform. The default `Selector` class uses the most 110 efficient implementation on the current platform. 111 """ 112 113 @abstractmethod 114 def register(self, fileobj, events, data=None): 115 """Register a file object. 116 117 Parameters: 118 fileobj -- file object or file descriptor 119 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) 120 data -- attached data 121 122 Returns: 123 SelectorKey instance 124 125 Raises: 126 ValueError if events is invalid 127 KeyError if fileobj is already registered 128 OSError if fileobj is closed or otherwise is unacceptable to 129 the underlying system call (if a system call is made) 130 131 Note: 132 OSError may or may not be raised 133 """ 134 raise NotImplementedError 135 136 @abstractmethod 137 def unregister(self, fileobj): 138 """Unregister a file object. 139 140 Parameters: 141 fileobj -- file object or file descriptor 142 143 Returns: 144 SelectorKey instance 145 146 Raises: 147 KeyError if fileobj is not registered 148 149 Note: 150 If fileobj is registered but has since been closed this does 151 *not* raise OSError (even if the wrapped syscall does) 152 """ 153 raise NotImplementedError 154 155 def modify(self, fileobj, events, data=None): 156 """Change a registered file object monitored events or attached data. 157 158 Parameters: 159 fileobj -- file object or file descriptor 160 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) 161 data -- attached data 162 163 Returns: 164 SelectorKey instance 165 166 Raises: 167 Anything that unregister() or register() raises 168 """ 169 self.unregister(fileobj) 170 return self.register(fileobj, events, data) 171 172 @abstractmethod 173 def select(self, timeout=None): 174 """Perform the actual selection, until some monitored file objects are 175 ready or a timeout expires. 176 177 Parameters: 178 timeout -- if timeout > 0, this specifies the maximum wait time, in 179 seconds 180 if timeout <= 0, the select() call won't block, and will 181 report the currently ready file objects 182 if timeout is None, select() will block until a monitored 183 file object becomes ready 184 185 Returns: 186 list of (key, events) for ready file objects 187 `events` is a bitwise mask of EVENT_READ|EVENT_WRITE 188 """ 189 raise NotImplementedError 190 191 def close(self): 192 """Close the selector. 193 194 This must be called to make sure that any underlying resource is freed. 195 """ 196 pass 197 198 def get_key(self, fileobj): 199 """Return the key associated to a registered file object. 200 201 Returns: 202 SelectorKey for this file object 203 """ 204 mapping = self.get_map() 205 if mapping is None: 206 raise RuntimeError('Selector is closed') 207 try: 208 return mapping[fileobj] 209 except KeyError: 210 raise KeyError("{0!r} is not registered".format(fileobj)) 211 212 @abstractmethod 213 def get_map(self): 214 """Return a mapping of file objects to selector keys.""" 215 raise NotImplementedError 216 217 def __enter__(self): 218 return self 219 220 def __exit__(self, *args): 221 self.close() 222 223 224class _BaseSelectorImpl(BaseSelector): 225 """Base selector implementation.""" 226 227 def __init__(self): 228 # this maps file descriptors to keys 229 self._fd_to_key = {} 230 # read-only mapping returned by get_map() 231 self._map = _SelectorMapping(self) 232 233 def _fileobj_lookup(self, fileobj): 234 """Return a file descriptor from a file object. 235 236 This wraps _fileobj_to_fd() to do an exhaustive search in case 237 the object is invalid but we still have it in our map. This 238 is used by unregister() so we can unregister an object that 239 was previously registered even if it is closed. It is also 240 used by _SelectorMapping. 241 """ 242 try: 243 return _fileobj_to_fd(fileobj) 244 except ValueError: 245 # Do an exhaustive search. 246 for key in self._fd_to_key.values(): 247 if key.fileobj is fileobj: 248 return key.fd 249 # Raise ValueError after all. 250 raise 251 252 def register(self, fileobj, events, data=None): 253 if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): 254 raise ValueError("Invalid events: {0!r}".format(events)) 255 256 key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data) 257 258 if key.fd in self._fd_to_key: 259 raise KeyError("{0!r} (FD {1}) is already registered" 260 .format(fileobj, key.fd)) 261 262 self._fd_to_key[key.fd] = key 263 return key 264 265 def unregister(self, fileobj): 266 try: 267 key = self._fd_to_key.pop(self._fileobj_lookup(fileobj)) 268 except KeyError: 269 raise KeyError("{0!r} is not registered".format(fileobj)) 270 return key 271 272 def modify(self, fileobj, events, data=None): 273 # TODO: Subclasses can probably optimize this even further. 274 try: 275 key = self._fd_to_key[self._fileobj_lookup(fileobj)] 276 except KeyError: 277 raise KeyError("{0!r} is not registered".format(fileobj)) 278 if events != key.events: 279 self.unregister(fileobj) 280 key = self.register(fileobj, events, data) 281 elif data != key.data: 282 # Use a shortcut to update the data. 283 key = key._replace(data=data) 284 self._fd_to_key[key.fd] = key 285 return key 286 287 def close(self): 288 self._fd_to_key.clear() 289 self._map = None 290 291 def get_map(self): 292 return self._map 293 294 def _key_from_fd(self, fd): 295 """Return the key associated to a given file descriptor. 296 297 Parameters: 298 fd -- file descriptor 299 300 Returns: 301 corresponding key, or None if not found 302 """ 303 try: 304 return self._fd_to_key[fd] 305 except KeyError: 306 return None 307 308 309class SelectSelector(_BaseSelectorImpl): 310 """Select-based selector.""" 311 312 def __init__(self): 313 super(SelectSelector, self).__init__() 314 self._readers = set() 315 self._writers = set() 316 317 def register(self, fileobj, events, data=None): 318 key = super(SelectSelector, self).register(fileobj, events, data) 319 if events & EVENT_READ: 320 self._readers.add(key.fd) 321 if events & EVENT_WRITE: 322 self._writers.add(key.fd) 323 return key 324 325 def unregister(self, fileobj): 326 key = super(SelectSelector, self).unregister(fileobj) 327 self._readers.discard(key.fd) 328 self._writers.discard(key.fd) 329 return key 330 331 if sys.platform == 'win32': 332 def _select(self, r, w, _, timeout=None): 333 r, w, x = select.select(r, w, w, timeout) 334 return r, w + x, [] 335 else: 336 _select = staticmethod(select.select) 337 338 def select(self, timeout=None): 339 timeout = None if timeout is None else max(timeout, 0) 340 ready = [] 341 try: 342 r, w, _ = self._select(self._readers, self._writers, [], timeout) 343 except select.error as exc: 344 if exc.args[0] == EINTR: 345 return ready 346 else: 347 raise 348 r = set(r) 349 w = set(w) 350 for fd in r | w: 351 events = 0 352 if fd in r: 353 events |= EVENT_READ 354 if fd in w: 355 events |= EVENT_WRITE 356 357 key = self._key_from_fd(fd) 358 if key: 359 ready.append((key, events & key.events)) 360 return ready 361 362 363if hasattr(select, 'poll'): 364 365 class PollSelector(_BaseSelectorImpl): 366 """Poll-based selector.""" 367 368 def __init__(self): 369 super(PollSelector, self).__init__() 370 self._poll = select.poll() 371 372 def register(self, fileobj, events, data=None): 373 key = super(PollSelector, self).register(fileobj, events, data) 374 poll_events = 0 375 if events & EVENT_READ: 376 poll_events |= select.POLLIN 377 if events & EVENT_WRITE: 378 poll_events |= select.POLLOUT 379 self._poll.register(key.fd, poll_events) 380 return key 381 382 def unregister(self, fileobj): 383 key = super(PollSelector, self).unregister(fileobj) 384 self._poll.unregister(key.fd) 385 return key 386 387 def select(self, timeout=None): 388 if timeout is None: 389 timeout = None 390 elif timeout <= 0: 391 timeout = 0 392 else: 393 # poll() has a resolution of 1 millisecond, round away from 394 # zero to wait *at least* timeout seconds. 395 timeout = int(math.ceil(timeout * 1e3)) 396 ready = [] 397 try: 398 fd_event_list = self._poll.poll(timeout) 399 except select.error as exc: 400 if exc.args[0] == EINTR: 401 return ready 402 else: 403 raise 404 for fd, event in fd_event_list: 405 events = 0 406 if event & ~select.POLLIN: 407 events |= EVENT_WRITE 408 if event & ~select.POLLOUT: 409 events |= EVENT_READ 410 411 key = self._key_from_fd(fd) 412 if key: 413 ready.append((key, events & key.events)) 414 return ready 415 416 417if hasattr(select, 'epoll'): 418 419 class EpollSelector(_BaseSelectorImpl): 420 """Epoll-based selector.""" 421 422 def __init__(self): 423 super(EpollSelector, self).__init__() 424 self._epoll = select.epoll() 425 426 def fileno(self): 427 return self._epoll.fileno() 428 429 def register(self, fileobj, events, data=None): 430 key = super(EpollSelector, self).register(fileobj, events, data) 431 epoll_events = 0 432 if events & EVENT_READ: 433 epoll_events |= select.EPOLLIN 434 if events & EVENT_WRITE: 435 epoll_events |= select.EPOLLOUT 436 self._epoll.register(key.fd, epoll_events) 437 return key 438 439 def unregister(self, fileobj): 440 key = super(EpollSelector, self).unregister(fileobj) 441 try: 442 self._epoll.unregister(key.fd) 443 except IOError: 444 # This can happen if the FD was closed since it 445 # was registered. 446 pass 447 return key 448 449 def select(self, timeout=None): 450 if timeout is None: 451 timeout = -1 452 elif timeout <= 0: 453 timeout = 0 454 else: 455 # epoll_wait() has a resolution of 1 millisecond, round away 456 # from zero to wait *at least* timeout seconds. 457 timeout = math.ceil(timeout * 1e3) * 1e-3 458 459 # epoll_wait() expects `maxevents` to be greater than zero; 460 # we want to make sure that `select()` can be called when no 461 # FD is registered. 462 max_ev = max(len(self._fd_to_key), 1) 463 464 ready = [] 465 try: 466 fd_event_list = self._epoll.poll(timeout, max_ev) 467 except IOError as exc: 468 if exc.errno == EINTR: 469 return ready 470 else: 471 raise 472 for fd, event in fd_event_list: 473 events = 0 474 if event & ~select.EPOLLIN: 475 events |= EVENT_WRITE 476 if event & ~select.EPOLLOUT: 477 events |= EVENT_READ 478 479 key = self._key_from_fd(fd) 480 if key: 481 ready.append((key, events & key.events)) 482 return ready 483 484 def close(self): 485 self._epoll.close() 486 super(EpollSelector, self).close() 487 488 489if hasattr(select, 'devpoll'): 490 491 class DevpollSelector(_BaseSelectorImpl): 492 """Solaris /dev/poll selector.""" 493 494 def __init__(self): 495 super(DevpollSelector, self).__init__() 496 self._devpoll = select.devpoll() 497 498 def fileno(self): 499 return self._devpoll.fileno() 500 501 def register(self, fileobj, events, data=None): 502 key = super(DevpollSelector, self).register(fileobj, events, data) 503 poll_events = 0 504 if events & EVENT_READ: 505 poll_events |= select.POLLIN 506 if events & EVENT_WRITE: 507 poll_events |= select.POLLOUT 508 self._devpoll.register(key.fd, poll_events) 509 return key 510 511 def unregister(self, fileobj): 512 key = super(DevpollSelector, self).unregister(fileobj) 513 self._devpoll.unregister(key.fd) 514 return key 515 516 def select(self, timeout=None): 517 if timeout is None: 518 timeout = None 519 elif timeout <= 0: 520 timeout = 0 521 else: 522 # devpoll() has a resolution of 1 millisecond, round away from 523 # zero to wait *at least* timeout seconds. 524 timeout = math.ceil(timeout * 1e3) 525 ready = [] 526 try: 527 fd_event_list = self._devpoll.poll(timeout) 528 except OSError as exc: 529 if exc.errno == EINTR: 530 return ready 531 else: 532 raise 533 for fd, event in fd_event_list: 534 events = 0 535 if event & ~select.POLLIN: 536 events |= EVENT_WRITE 537 if event & ~select.POLLOUT: 538 events |= EVENT_READ 539 540 key = self._key_from_fd(fd) 541 if key: 542 ready.append((key, events & key.events)) 543 return ready 544 545 def close(self): 546 self._devpoll.close() 547 super(DevpollSelector, self).close() 548 549 550if hasattr(select, 'kqueue'): 551 552 class KqueueSelector(_BaseSelectorImpl): 553 """Kqueue-based selector.""" 554 555 def __init__(self): 556 super(KqueueSelector, self).__init__() 557 self._kqueue = select.kqueue() 558 559 def fileno(self): 560 return self._kqueue.fileno() 561 562 def register(self, fileobj, events, data=None): 563 key = super(KqueueSelector, self).register(fileobj, events, data) 564 if events & EVENT_READ: 565 kev = select.kevent(key.fd, select.KQ_FILTER_READ, 566 select.KQ_EV_ADD) 567 self._kqueue.control([kev], 0, 0) 568 if events & EVENT_WRITE: 569 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, 570 select.KQ_EV_ADD) 571 self._kqueue.control([kev], 0, 0) 572 return key 573 574 def unregister(self, fileobj): 575 key = super(KqueueSelector, self).unregister(fileobj) 576 if key.events & EVENT_READ: 577 kev = select.kevent(key.fd, select.KQ_FILTER_READ, 578 select.KQ_EV_DELETE) 579 try: 580 self._kqueue.control([kev], 0, 0) 581 except OSError: 582 # This can happen if the FD was closed since it 583 # was registered. 584 pass 585 if key.events & EVENT_WRITE: 586 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, 587 select.KQ_EV_DELETE) 588 try: 589 self._kqueue.control([kev], 0, 0) 590 except OSError: 591 # See comment above. 592 pass 593 return key 594 595 def select(self, timeout=None): 596 timeout = None if timeout is None else max(timeout, 0) 597 max_ev = len(self._fd_to_key) 598 ready = [] 599 try: 600 kev_list = self._kqueue.control(None, max_ev, timeout) 601 except OSError as exc: 602 if exc.errno == EINTR: 603 return ready 604 else: 605 raise 606 for kev in kev_list: 607 fd = kev.ident 608 flag = kev.filter 609 events = 0 610 if flag == select.KQ_FILTER_READ: 611 events |= EVENT_READ 612 if flag == select.KQ_FILTER_WRITE: 613 events |= EVENT_WRITE 614 615 key = self._key_from_fd(fd) 616 if key: 617 ready.append((key, events & key.events)) 618 return ready 619 620 def close(self): 621 self._kqueue.close() 622 super(KqueueSelector, self).close() 623 624 625# Choose the best implementation, roughly: 626# epoll|kqueue|devpoll > poll > select. 627# select() also can't accept a FD > FD_SETSIZE (usually around 1024) 628if 'KqueueSelector' in globals(): 629 DefaultSelector = KqueueSelector 630elif 'EpollSelector' in globals(): 631 DefaultSelector = EpollSelector 632elif 'DevpollSelector' in globals(): 633 DefaultSelector = DevpollSelector 634elif 'PollSelector' in globals(): 635 DefaultSelector = PollSelector 636else: 637 DefaultSelector = SelectSelector 638