1# 2# A higher level module for using sockets (or Windows named pipes) 3# 4# multiprocessing/connection.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] 11 12import io 13import os 14import sys 15import socket 16import struct 17import time 18import tempfile 19import itertools 20 21import _multiprocessing 22 23from . import util 24 25from . import AuthenticationError, BufferTooShort 26from .context import reduction 27_ForkingPickler = reduction.ForkingPickler 28 29try: 30 import _winapi 31 from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE 32except ImportError: 33 if sys.platform == 'win32': 34 raise 35 _winapi = None 36 37# 38# 39# 40 41BUFSIZE = 8192 42# A very generous timeout when it comes to local connections... 43CONNECTION_TIMEOUT = 20. 44 45_mmap_counter = itertools.count() 46 47default_family = 'AF_INET' 48families = ['AF_INET'] 49 50if hasattr(socket, 'AF_UNIX'): 51 default_family = 'AF_UNIX' 52 families += ['AF_UNIX'] 53 54if sys.platform == 'win32': 55 default_family = 'AF_PIPE' 56 families += ['AF_PIPE'] 57 58 59def _init_timeout(timeout=CONNECTION_TIMEOUT): 60 return time.monotonic() + timeout 61 62def _check_timeout(t): 63 return time.monotonic() > t 64 65# 66# 67# 68 69def arbitrary_address(family): 70 ''' 71 Return an arbitrary free address for the given family 72 ''' 73 if family == 'AF_INET': 74 return ('localhost', 0) 75 elif family == 'AF_UNIX': 76 return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir()) 77 elif family == 'AF_PIPE': 78 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % 79 (os.getpid(), next(_mmap_counter)), dir="") 80 else: 81 raise ValueError('unrecognized family') 82 83def _validate_family(family): 84 ''' 85 Checks if the family is valid for the current environment. 86 ''' 87 if sys.platform != 'win32' and family == 'AF_PIPE': 88 raise ValueError('Family %s is not recognized.' % family) 89 90 if sys.platform == 'win32' and family == 'AF_UNIX': 91 # double check 92 if not hasattr(socket, family): 93 raise ValueError('Family %s is not recognized.' % family) 94 95def address_type(address): 96 ''' 97 Return the types of the address 98 99 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' 100 ''' 101 if type(address) == tuple: 102 return 'AF_INET' 103 elif type(address) is str and address.startswith('\\\\'): 104 return 'AF_PIPE' 105 elif type(address) is str or util.is_abstract_socket_namespace(address): 106 return 'AF_UNIX' 107 else: 108 raise ValueError('address type of %r unrecognized' % address) 109 110# 111# Connection classes 112# 113 114class _ConnectionBase: 115 _handle = None 116 117 def __init__(self, handle, readable=True, writable=True): 118 handle = handle.__index__() 119 if handle < 0: 120 raise ValueError("invalid handle") 121 if not readable and not writable: 122 raise ValueError( 123 "at least one of `readable` and `writable` must be True") 124 self._handle = handle 125 self._readable = readable 126 self._writable = writable 127 128 # XXX should we use util.Finalize instead of a __del__? 129 130 def __del__(self): 131 if self._handle is not None: 132 self._close() 133 134 def _check_closed(self): 135 if self._handle is None: 136 raise OSError("handle is closed") 137 138 def _check_readable(self): 139 if not self._readable: 140 raise OSError("connection is write-only") 141 142 def _check_writable(self): 143 if not self._writable: 144 raise OSError("connection is read-only") 145 146 def _bad_message_length(self): 147 if self._writable: 148 self._readable = False 149 else: 150 self.close() 151 raise OSError("bad message length") 152 153 @property 154 def closed(self): 155 """True if the connection is closed""" 156 return self._handle is None 157 158 @property 159 def readable(self): 160 """True if the connection is readable""" 161 return self._readable 162 163 @property 164 def writable(self): 165 """True if the connection is writable""" 166 return self._writable 167 168 def fileno(self): 169 """File descriptor or handle of the connection""" 170 self._check_closed() 171 return self._handle 172 173 def close(self): 174 """Close the connection""" 175 if self._handle is not None: 176 try: 177 self._close() 178 finally: 179 self._handle = None 180 181 def send_bytes(self, buf, offset=0, size=None): 182 """Send the bytes data from a bytes-like object""" 183 self._check_closed() 184 self._check_writable() 185 m = memoryview(buf) 186 # HACK for byte-indexing of non-bytewise buffers (e.g. array.array) 187 if m.itemsize > 1: 188 m = memoryview(bytes(m)) 189 n = len(m) 190 if offset < 0: 191 raise ValueError("offset is negative") 192 if n < offset: 193 raise ValueError("buffer length < offset") 194 if size is None: 195 size = n - offset 196 elif size < 0: 197 raise ValueError("size is negative") 198 elif offset + size > n: 199 raise ValueError("buffer length < offset + size") 200 self._send_bytes(m[offset:offset + size]) 201 202 def send(self, obj): 203 """Send a (picklable) object""" 204 self._check_closed() 205 self._check_writable() 206 self._send_bytes(_ForkingPickler.dumps(obj)) 207 208 def recv_bytes(self, maxlength=None): 209 """ 210 Receive bytes data as a bytes object. 211 """ 212 self._check_closed() 213 self._check_readable() 214 if maxlength is not None and maxlength < 0: 215 raise ValueError("negative maxlength") 216 buf = self._recv_bytes(maxlength) 217 if buf is None: 218 self._bad_message_length() 219 return buf.getvalue() 220 221 def recv_bytes_into(self, buf, offset=0): 222 """ 223 Receive bytes data into a writeable bytes-like object. 224 Return the number of bytes read. 225 """ 226 self._check_closed() 227 self._check_readable() 228 with memoryview(buf) as m: 229 # Get bytesize of arbitrary buffer 230 itemsize = m.itemsize 231 bytesize = itemsize * len(m) 232 if offset < 0: 233 raise ValueError("negative offset") 234 elif offset > bytesize: 235 raise ValueError("offset too large") 236 result = self._recv_bytes() 237 size = result.tell() 238 if bytesize < offset + size: 239 raise BufferTooShort(result.getvalue()) 240 # Message can fit in dest 241 result.seek(0) 242 result.readinto(m[offset // itemsize : 243 (offset + size) // itemsize]) 244 return size 245 246 def recv(self): 247 """Receive a (picklable) object""" 248 self._check_closed() 249 self._check_readable() 250 buf = self._recv_bytes() 251 return _ForkingPickler.loads(buf.getbuffer()) 252 253 def poll(self, timeout=0.0): 254 """Whether there is any input available to be read""" 255 self._check_closed() 256 self._check_readable() 257 return self._poll(timeout) 258 259 def __enter__(self): 260 return self 261 262 def __exit__(self, exc_type, exc_value, exc_tb): 263 self.close() 264 265 266if _winapi: 267 268 class PipeConnection(_ConnectionBase): 269 """ 270 Connection class based on a Windows named pipe. 271 Overlapped I/O is used, so the handles must have been created 272 with FILE_FLAG_OVERLAPPED. 273 """ 274 _got_empty_message = False 275 276 def _close(self, _CloseHandle=_winapi.CloseHandle): 277 _CloseHandle(self._handle) 278 279 def _send_bytes(self, buf): 280 ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) 281 try: 282 if err == _winapi.ERROR_IO_PENDING: 283 waitres = _winapi.WaitForMultipleObjects( 284 [ov.event], False, INFINITE) 285 assert waitres == WAIT_OBJECT_0 286 except: 287 ov.cancel() 288 raise 289 finally: 290 nwritten, err = ov.GetOverlappedResult(True) 291 assert err == 0 292 assert nwritten == len(buf) 293 294 def _recv_bytes(self, maxsize=None): 295 if self._got_empty_message: 296 self._got_empty_message = False 297 return io.BytesIO() 298 else: 299 bsize = 128 if maxsize is None else min(maxsize, 128) 300 try: 301 ov, err = _winapi.ReadFile(self._handle, bsize, 302 overlapped=True) 303 try: 304 if err == _winapi.ERROR_IO_PENDING: 305 waitres = _winapi.WaitForMultipleObjects( 306 [ov.event], False, INFINITE) 307 assert waitres == WAIT_OBJECT_0 308 except: 309 ov.cancel() 310 raise 311 finally: 312 nread, err = ov.GetOverlappedResult(True) 313 if err == 0: 314 f = io.BytesIO() 315 f.write(ov.getbuffer()) 316 return f 317 elif err == _winapi.ERROR_MORE_DATA: 318 return self._get_more_data(ov, maxsize) 319 except OSError as e: 320 if e.winerror == _winapi.ERROR_BROKEN_PIPE: 321 raise EOFError 322 else: 323 raise 324 raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") 325 326 def _poll(self, timeout): 327 if (self._got_empty_message or 328 _winapi.PeekNamedPipe(self._handle)[0] != 0): 329 return True 330 return bool(wait([self], timeout)) 331 332 def _get_more_data(self, ov, maxsize): 333 buf = ov.getbuffer() 334 f = io.BytesIO() 335 f.write(buf) 336 left = _winapi.PeekNamedPipe(self._handle)[1] 337 assert left > 0 338 if maxsize is not None and len(buf) + left > maxsize: 339 self._bad_message_length() 340 ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) 341 rbytes, err = ov.GetOverlappedResult(True) 342 assert err == 0 343 assert rbytes == left 344 f.write(ov.getbuffer()) 345 return f 346 347 348class Connection(_ConnectionBase): 349 """ 350 Connection class based on an arbitrary file descriptor (Unix only), or 351 a socket handle (Windows). 352 """ 353 354 if _winapi: 355 def _close(self, _close=_multiprocessing.closesocket): 356 _close(self._handle) 357 _write = _multiprocessing.send 358 _read = _multiprocessing.recv 359 else: 360 def _close(self, _close=os.close): 361 _close(self._handle) 362 _write = os.write 363 _read = os.read 364 365 def _send(self, buf, write=_write): 366 remaining = len(buf) 367 while True: 368 n = write(self._handle, buf) 369 remaining -= n 370 if remaining == 0: 371 break 372 buf = buf[n:] 373 374 def _recv(self, size, read=_read): 375 buf = io.BytesIO() 376 handle = self._handle 377 remaining = size 378 while remaining > 0: 379 chunk = read(handle, remaining) 380 n = len(chunk) 381 if n == 0: 382 if remaining == size: 383 raise EOFError 384 else: 385 raise OSError("got end of file during message") 386 buf.write(chunk) 387 remaining -= n 388 return buf 389 390 def _send_bytes(self, buf): 391 n = len(buf) 392 if n > 0x7fffffff: 393 pre_header = struct.pack("!i", -1) 394 header = struct.pack("!Q", n) 395 self._send(pre_header) 396 self._send(header) 397 self._send(buf) 398 else: 399 # For wire compatibility with 3.7 and lower 400 header = struct.pack("!i", n) 401 if n > 16384: 402 # The payload is large so Nagle's algorithm won't be triggered 403 # and we'd better avoid the cost of concatenation. 404 self._send(header) 405 self._send(buf) 406 else: 407 # Issue #20540: concatenate before sending, to avoid delays due 408 # to Nagle's algorithm on a TCP socket. 409 # Also note we want to avoid sending a 0-length buffer separately, 410 # to avoid "broken pipe" errors if the other end closed the pipe. 411 self._send(header + buf) 412 413 def _recv_bytes(self, maxsize=None): 414 buf = self._recv(4) 415 size, = struct.unpack("!i", buf.getvalue()) 416 if size == -1: 417 buf = self._recv(8) 418 size, = struct.unpack("!Q", buf.getvalue()) 419 if maxsize is not None and size > maxsize: 420 return None 421 return self._recv(size) 422 423 def _poll(self, timeout): 424 r = wait([self], timeout) 425 return bool(r) 426 427 428# 429# Public functions 430# 431 432class Listener(object): 433 ''' 434 Returns a listener object. 435 436 This is a wrapper for a bound socket which is 'listening' for 437 connections, or for a Windows named pipe. 438 ''' 439 def __init__(self, address=None, family=None, backlog=1, authkey=None): 440 family = family or (address and address_type(address)) \ 441 or default_family 442 address = address or arbitrary_address(family) 443 444 _validate_family(family) 445 if family == 'AF_PIPE': 446 self._listener = PipeListener(address, backlog) 447 else: 448 self._listener = SocketListener(address, family, backlog) 449 450 if authkey is not None and not isinstance(authkey, bytes): 451 raise TypeError('authkey should be a byte string') 452 453 self._authkey = authkey 454 455 def accept(self): 456 ''' 457 Accept a connection on the bound socket or named pipe of `self`. 458 459 Returns a `Connection` object. 460 ''' 461 if self._listener is None: 462 raise OSError('listener is closed') 463 c = self._listener.accept() 464 if self._authkey: 465 deliver_challenge(c, self._authkey) 466 answer_challenge(c, self._authkey) 467 return c 468 469 def close(self): 470 ''' 471 Close the bound socket or named pipe of `self`. 472 ''' 473 listener = self._listener 474 if listener is not None: 475 self._listener = None 476 listener.close() 477 478 @property 479 def address(self): 480 return self._listener._address 481 482 @property 483 def last_accepted(self): 484 return self._listener._last_accepted 485 486 def __enter__(self): 487 return self 488 489 def __exit__(self, exc_type, exc_value, exc_tb): 490 self.close() 491 492 493def Client(address, family=None, authkey=None): 494 ''' 495 Returns a connection to the address of a `Listener` 496 ''' 497 family = family or address_type(address) 498 _validate_family(family) 499 if family == 'AF_PIPE': 500 c = PipeClient(address) 501 else: 502 c = SocketClient(address) 503 504 if authkey is not None and not isinstance(authkey, bytes): 505 raise TypeError('authkey should be a byte string') 506 507 if authkey is not None: 508 answer_challenge(c, authkey) 509 deliver_challenge(c, authkey) 510 511 return c 512 513 514if sys.platform != 'win32': 515 516 def Pipe(duplex=True): 517 ''' 518 Returns pair of connection objects at either end of a pipe 519 ''' 520 if duplex: 521 s1, s2 = socket.socketpair() 522 s1.setblocking(True) 523 s2.setblocking(True) 524 c1 = Connection(s1.detach()) 525 c2 = Connection(s2.detach()) 526 else: 527 fd1, fd2 = os.pipe() 528 c1 = Connection(fd1, writable=False) 529 c2 = Connection(fd2, readable=False) 530 531 return c1, c2 532 533else: 534 535 def Pipe(duplex=True): 536 ''' 537 Returns pair of connection objects at either end of a pipe 538 ''' 539 address = arbitrary_address('AF_PIPE') 540 if duplex: 541 openmode = _winapi.PIPE_ACCESS_DUPLEX 542 access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE 543 obsize, ibsize = BUFSIZE, BUFSIZE 544 else: 545 openmode = _winapi.PIPE_ACCESS_INBOUND 546 access = _winapi.GENERIC_WRITE 547 obsize, ibsize = 0, BUFSIZE 548 549 h1 = _winapi.CreateNamedPipe( 550 address, openmode | _winapi.FILE_FLAG_OVERLAPPED | 551 _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, 552 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 553 _winapi.PIPE_WAIT, 554 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, 555 # default security descriptor: the handle cannot be inherited 556 _winapi.NULL 557 ) 558 h2 = _winapi.CreateFile( 559 address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, 560 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 561 ) 562 _winapi.SetNamedPipeHandleState( 563 h2, _winapi.PIPE_READMODE_MESSAGE, None, None 564 ) 565 566 overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True) 567 _, err = overlapped.GetOverlappedResult(True) 568 assert err == 0 569 570 c1 = PipeConnection(h1, writable=duplex) 571 c2 = PipeConnection(h2, readable=duplex) 572 573 return c1, c2 574 575# 576# Definitions for connections based on sockets 577# 578 579class SocketListener(object): 580 ''' 581 Representation of a socket which is bound to an address and listening 582 ''' 583 def __init__(self, address, family, backlog=1): 584 self._socket = socket.socket(getattr(socket, family)) 585 try: 586 # SO_REUSEADDR has different semantics on Windows (issue #2550). 587 if os.name == 'posix': 588 self._socket.setsockopt(socket.SOL_SOCKET, 589 socket.SO_REUSEADDR, 1) 590 self._socket.setblocking(True) 591 self._socket.bind(address) 592 self._socket.listen(backlog) 593 self._address = self._socket.getsockname() 594 except OSError: 595 self._socket.close() 596 raise 597 self._family = family 598 self._last_accepted = None 599 600 if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address): 601 # Linux abstract socket namespaces do not need to be explicitly unlinked 602 self._unlink = util.Finalize( 603 self, os.unlink, args=(address,), exitpriority=0 604 ) 605 else: 606 self._unlink = None 607 608 def accept(self): 609 s, self._last_accepted = self._socket.accept() 610 s.setblocking(True) 611 return Connection(s.detach()) 612 613 def close(self): 614 try: 615 self._socket.close() 616 finally: 617 unlink = self._unlink 618 if unlink is not None: 619 self._unlink = None 620 unlink() 621 622 623def SocketClient(address): 624 ''' 625 Return a connection object connected to the socket given by `address` 626 ''' 627 family = address_type(address) 628 with socket.socket( getattr(socket, family) ) as s: 629 s.setblocking(True) 630 s.connect(address) 631 return Connection(s.detach()) 632 633# 634# Definitions for connections based on named pipes 635# 636 637if sys.platform == 'win32': 638 639 class PipeListener(object): 640 ''' 641 Representation of a named pipe 642 ''' 643 def __init__(self, address, backlog=None): 644 self._address = address 645 self._handle_queue = [self._new_handle(first=True)] 646 647 self._last_accepted = None 648 util.sub_debug('listener created with address=%r', self._address) 649 self.close = util.Finalize( 650 self, PipeListener._finalize_pipe_listener, 651 args=(self._handle_queue, self._address), exitpriority=0 652 ) 653 654 def _new_handle(self, first=False): 655 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 656 if first: 657 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 658 return _winapi.CreateNamedPipe( 659 self._address, flags, 660 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 661 _winapi.PIPE_WAIT, 662 _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, 663 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL 664 ) 665 666 def accept(self): 667 self._handle_queue.append(self._new_handle()) 668 handle = self._handle_queue.pop(0) 669 try: 670 ov = _winapi.ConnectNamedPipe(handle, overlapped=True) 671 except OSError as e: 672 if e.winerror != _winapi.ERROR_NO_DATA: 673 raise 674 # ERROR_NO_DATA can occur if a client has already connected, 675 # written data and then disconnected -- see Issue 14725. 676 else: 677 try: 678 res = _winapi.WaitForMultipleObjects( 679 [ov.event], False, INFINITE) 680 except: 681 ov.cancel() 682 _winapi.CloseHandle(handle) 683 raise 684 finally: 685 _, err = ov.GetOverlappedResult(True) 686 assert err == 0 687 return PipeConnection(handle) 688 689 @staticmethod 690 def _finalize_pipe_listener(queue, address): 691 util.sub_debug('closing listener with address=%r', address) 692 for handle in queue: 693 _winapi.CloseHandle(handle) 694 695 def PipeClient(address): 696 ''' 697 Return a connection object connected to the pipe given by `address` 698 ''' 699 t = _init_timeout() 700 while 1: 701 try: 702 _winapi.WaitNamedPipe(address, 1000) 703 h = _winapi.CreateFile( 704 address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE, 705 0, _winapi.NULL, _winapi.OPEN_EXISTING, 706 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 707 ) 708 except OSError as e: 709 if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, 710 _winapi.ERROR_PIPE_BUSY) or _check_timeout(t): 711 raise 712 else: 713 break 714 else: 715 raise 716 717 _winapi.SetNamedPipeHandleState( 718 h, _winapi.PIPE_READMODE_MESSAGE, None, None 719 ) 720 return PipeConnection(h) 721 722# 723# Authentication stuff 724# 725 726MESSAGE_LENGTH = 20 727 728CHALLENGE = b'#CHALLENGE#' 729WELCOME = b'#WELCOME#' 730FAILURE = b'#FAILURE#' 731 732def deliver_challenge(connection, authkey): 733 import hmac 734 if not isinstance(authkey, bytes): 735 raise ValueError( 736 "Authkey must be bytes, not {0!s}".format(type(authkey))) 737 message = os.urandom(MESSAGE_LENGTH) 738 connection.send_bytes(CHALLENGE + message) 739 digest = hmac.new(authkey, message, 'md5').digest() 740 response = connection.recv_bytes(256) # reject large message 741 if response == digest: 742 connection.send_bytes(WELCOME) 743 else: 744 connection.send_bytes(FAILURE) 745 raise AuthenticationError('digest received was wrong') 746 747def answer_challenge(connection, authkey): 748 import hmac 749 if not isinstance(authkey, bytes): 750 raise ValueError( 751 "Authkey must be bytes, not {0!s}".format(type(authkey))) 752 message = connection.recv_bytes(256) # reject large message 753 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message 754 message = message[len(CHALLENGE):] 755 digest = hmac.new(authkey, message, 'md5').digest() 756 connection.send_bytes(digest) 757 response = connection.recv_bytes(256) # reject large message 758 if response != WELCOME: 759 raise AuthenticationError('digest sent was rejected') 760 761# 762# Support for using xmlrpclib for serialization 763# 764 765class ConnectionWrapper(object): 766 def __init__(self, conn, dumps, loads): 767 self._conn = conn 768 self._dumps = dumps 769 self._loads = loads 770 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): 771 obj = getattr(conn, attr) 772 setattr(self, attr, obj) 773 def send(self, obj): 774 s = self._dumps(obj) 775 self._conn.send_bytes(s) 776 def recv(self): 777 s = self._conn.recv_bytes() 778 return self._loads(s) 779 780def _xml_dumps(obj): 781 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') 782 783def _xml_loads(s): 784 (obj,), method = xmlrpclib.loads(s.decode('utf-8')) 785 return obj 786 787class XmlListener(Listener): 788 def accept(self): 789 global xmlrpclib 790 import xmlrpc.client as xmlrpclib 791 obj = Listener.accept(self) 792 return ConnectionWrapper(obj, _xml_dumps, _xml_loads) 793 794def XmlClient(*args, **kwds): 795 global xmlrpclib 796 import xmlrpc.client as xmlrpclib 797 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) 798 799# 800# Wait 801# 802 803if sys.platform == 'win32': 804 805 def _exhaustive_wait(handles, timeout): 806 # Return ALL handles which are currently signalled. (Only 807 # returning the first signalled might create starvation issues.) 808 L = list(handles) 809 ready = [] 810 while L: 811 res = _winapi.WaitForMultipleObjects(L, False, timeout) 812 if res == WAIT_TIMEOUT: 813 break 814 elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): 815 res -= WAIT_OBJECT_0 816 elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L): 817 res -= WAIT_ABANDONED_0 818 else: 819 raise RuntimeError('Should not get here') 820 ready.append(L[res]) 821 L = L[res+1:] 822 timeout = 0 823 return ready 824 825 _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED} 826 827 def wait(object_list, timeout=None): 828 ''' 829 Wait till an object in object_list is ready/readable. 830 831 Returns list of those objects in object_list which are ready/readable. 832 ''' 833 if timeout is None: 834 timeout = INFINITE 835 elif timeout < 0: 836 timeout = 0 837 else: 838 timeout = int(timeout * 1000 + 0.5) 839 840 object_list = list(object_list) 841 waithandle_to_obj = {} 842 ov_list = [] 843 ready_objects = set() 844 ready_handles = set() 845 846 try: 847 for o in object_list: 848 try: 849 fileno = getattr(o, 'fileno') 850 except AttributeError: 851 waithandle_to_obj[o.__index__()] = o 852 else: 853 # start an overlapped read of length zero 854 try: 855 ov, err = _winapi.ReadFile(fileno(), 0, True) 856 except OSError as e: 857 ov, err = None, e.winerror 858 if err not in _ready_errors: 859 raise 860 if err == _winapi.ERROR_IO_PENDING: 861 ov_list.append(ov) 862 waithandle_to_obj[ov.event] = o 863 else: 864 # If o.fileno() is an overlapped pipe handle and 865 # err == 0 then there is a zero length message 866 # in the pipe, but it HAS NOT been consumed... 867 if ov and sys.getwindowsversion()[:2] >= (6, 2): 868 # ... except on Windows 8 and later, where 869 # the message HAS been consumed. 870 try: 871 _, err = ov.GetOverlappedResult(False) 872 except OSError as e: 873 err = e.winerror 874 if not err and hasattr(o, '_got_empty_message'): 875 o._got_empty_message = True 876 ready_objects.add(o) 877 timeout = 0 878 879 ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout) 880 finally: 881 # request that overlapped reads stop 882 for ov in ov_list: 883 ov.cancel() 884 885 # wait for all overlapped reads to stop 886 for ov in ov_list: 887 try: 888 _, err = ov.GetOverlappedResult(True) 889 except OSError as e: 890 err = e.winerror 891 if err not in _ready_errors: 892 raise 893 if err != _winapi.ERROR_OPERATION_ABORTED: 894 o = waithandle_to_obj[ov.event] 895 ready_objects.add(o) 896 if err == 0: 897 # If o.fileno() is an overlapped pipe handle then 898 # a zero length message HAS been consumed. 899 if hasattr(o, '_got_empty_message'): 900 o._got_empty_message = True 901 902 ready_objects.update(waithandle_to_obj[h] for h in ready_handles) 903 return [o for o in object_list if o in ready_objects] 904 905else: 906 907 import selectors 908 909 # poll/select have the advantage of not requiring any extra file 910 # descriptor, contrarily to epoll/kqueue (also, they require a single 911 # syscall). 912 if hasattr(selectors, 'PollSelector'): 913 _WaitSelector = selectors.PollSelector 914 else: 915 _WaitSelector = selectors.SelectSelector 916 917 def wait(object_list, timeout=None): 918 ''' 919 Wait till an object in object_list is ready/readable. 920 921 Returns list of those objects in object_list which are ready/readable. 922 ''' 923 with _WaitSelector() as selector: 924 for obj in object_list: 925 selector.register(obj, selectors.EVENT_READ) 926 927 if timeout is not None: 928 deadline = time.monotonic() + timeout 929 930 while True: 931 ready = selector.select(timeout) 932 if ready: 933 return [key.fileobj for (key, events) in ready] 934 else: 935 if timeout is not None: 936 timeout = deadline - time.monotonic() 937 if timeout < 0: 938 return ready 939 940# 941# Make connection and socket objects sharable if possible 942# 943 944if sys.platform == 'win32': 945 def reduce_connection(conn): 946 handle = conn.fileno() 947 with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: 948 from . import resource_sharer 949 ds = resource_sharer.DupSocket(s) 950 return rebuild_connection, (ds, conn.readable, conn.writable) 951 def rebuild_connection(ds, readable, writable): 952 sock = ds.detach() 953 return Connection(sock.detach(), readable, writable) 954 reduction.register(Connection, reduce_connection) 955 956 def reduce_pipe_connection(conn): 957 access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | 958 (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) 959 dh = reduction.DupHandle(conn.fileno(), access) 960 return rebuild_pipe_connection, (dh, conn.readable, conn.writable) 961 def rebuild_pipe_connection(dh, readable, writable): 962 handle = dh.detach() 963 return PipeConnection(handle, readable, writable) 964 reduction.register(PipeConnection, reduce_pipe_connection) 965 966else: 967 def reduce_connection(conn): 968 df = reduction.DupFd(conn.fileno()) 969 return rebuild_connection, (df, conn.readable, conn.writable) 970 def rebuild_connection(df, readable, writable): 971 fd = df.detach() 972 return Connection(fd, readable, writable) 973 reduction.register(Connection, reduce_connection) 974