1# 2# A higher level module for using sockets (or Windows named pipes) 3# 4# multiprocessing/connection.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] 11 12import io 13import os 14import sys 15import socket 16import struct 17import time 18import tempfile 19import itertools 20 21try: 22 import _multiprocess as _multiprocessing 23except ImportError: 24 import _multiprocessing 25 26from . import util 27 28from . import AuthenticationError, BufferTooShort 29from .context import reduction 30_ForkingPickler = reduction.ForkingPickler 31 32try: 33 import _winapi 34 from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE 35except ImportError: 36 if sys.platform == 'win32': 37 raise 38 _winapi = None 39 40# 41# 42# 43 44BUFSIZE = 8192 45# A very generous timeout when it comes to local connections... 46CONNECTION_TIMEOUT = 20. 47 48_mmap_counter = itertools.count() 49 50default_family = 'AF_INET' 51families = ['AF_INET'] 52 53if hasattr(socket, 'AF_UNIX'): 54 default_family = 'AF_UNIX' 55 families += ['AF_UNIX'] 56 57if sys.platform == 'win32': 58 default_family = 'AF_PIPE' 59 families += ['AF_PIPE'] 60 61 62def _init_timeout(timeout=CONNECTION_TIMEOUT): 63 return getattr(time,'monotonic',time.time)() + timeout 64 65def _check_timeout(t): 66 return getattr(time,'monotonic',time.time)() > t 67 68# 69# 70# 71 72def arbitrary_address(family): 73 ''' 74 Return an arbitrary free address for the given family 75 ''' 76 if family == 'AF_INET': 77 return ('localhost', 0) 78 elif family == 'AF_UNIX': 79 return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir()) 80 elif family == 'AF_PIPE': 81 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % 82 (os.getpid(), next(_mmap_counter)), dir="") 83 else: 84 raise ValueError('unrecognized family') 85 86def _validate_family(family): 87 ''' 88 Checks if the family is valid for the current environment. 89 ''' 90 if sys.platform != 'win32' and family == 'AF_PIPE': 91 raise ValueError('Family %s is not recognized.' % family) 92 93 if sys.platform == 'win32' and family == 'AF_UNIX': 94 # double check 95 if not hasattr(socket, family): 96 raise ValueError('Family %s is not recognized.' % family) 97 98def address_type(address): 99 ''' 100 Return the types of the address 101 102 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' 103 ''' 104 if type(address) == tuple: 105 return 'AF_INET' 106 elif type(address) is str and address.startswith('\\\\'): 107 return 'AF_PIPE' 108 elif type(address) is str or util.is_abstract_socket_namespace(address): 109 return 'AF_UNIX' 110 else: 111 raise ValueError('address type of %r unrecognized' % address) 112 113# 114# Connection classes 115# 116 117class _ConnectionBase: 118 _handle = None 119 120 def __init__(self, handle, readable=True, writable=True): 121 handle = handle.__index__() 122 if handle < 0: 123 raise ValueError("invalid handle") 124 if not readable and not writable: 125 raise ValueError( 126 "at least one of `readable` and `writable` must be True") 127 self._handle = handle 128 self._readable = readable 129 self._writable = writable 130 131 # XXX should we use util.Finalize instead of a __del__? 132 133 def __del__(self): 134 if self._handle is not None: 135 self._close() 136 137 def _check_closed(self): 138 if self._handle is None: 139 raise OSError("handle is closed") 140 141 def _check_readable(self): 142 if not self._readable: 143 raise OSError("connection is write-only") 144 145 def _check_writable(self): 146 if not self._writable: 147 raise OSError("connection is read-only") 148 149 def _bad_message_length(self): 150 if self._writable: 151 self._readable = False 152 else: 153 self.close() 154 raise OSError("bad message length") 155 156 @property 157 def closed(self): 158 """True if the connection is closed""" 159 return self._handle is None 160 161 @property 162 def readable(self): 163 """True if the connection is readable""" 164 return self._readable 165 166 @property 167 def writable(self): 168 """True if the connection is writable""" 169 return self._writable 170 171 def fileno(self): 172 """File descriptor or handle of the connection""" 173 self._check_closed() 174 return self._handle 175 176 def close(self): 177 """Close the connection""" 178 if self._handle is not None: 179 try: 180 self._close() 181 finally: 182 self._handle = None 183 184 def send_bytes(self, buf, offset=0, size=None): 185 """Send the bytes data from a bytes-like object""" 186 self._check_closed() 187 self._check_writable() 188 m = memoryview(buf) 189 # HACK for byte-indexing of non-bytewise buffers (e.g. array.array) 190 if m.itemsize > 1: 191 m = memoryview(bytes(m)) 192 n = len(m) 193 if offset < 0: 194 raise ValueError("offset is negative") 195 if n < offset: 196 raise ValueError("buffer length < offset") 197 if size is None: 198 size = n - offset 199 elif size < 0: 200 raise ValueError("size is negative") 201 elif offset + size > n: 202 raise ValueError("buffer length < offset + size") 203 self._send_bytes(m[offset:offset + size]) 204 205 def send(self, obj): 206 """Send a (picklable) object""" 207 self._check_closed() 208 self._check_writable() 209 self._send_bytes(_ForkingPickler.dumps(obj)) 210 211 def recv_bytes(self, maxlength=None): 212 """ 213 Receive bytes data as a bytes object. 214 """ 215 self._check_closed() 216 self._check_readable() 217 if maxlength is not None and maxlength < 0: 218 raise ValueError("negative maxlength") 219 buf = self._recv_bytes(maxlength) 220 if buf is None: 221 self._bad_message_length() 222 return buf.getvalue() 223 224 def recv_bytes_into(self, buf, offset=0): 225 """ 226 Receive bytes data into a writeable bytes-like object. 227 Return the number of bytes read. 228 """ 229 self._check_closed() 230 self._check_readable() 231 with memoryview(buf) as m: 232 # Get bytesize of arbitrary buffer 233 itemsize = m.itemsize 234 bytesize = itemsize * len(m) 235 if offset < 0: 236 raise ValueError("negative offset") 237 elif offset > bytesize: 238 raise ValueError("offset too large") 239 result = self._recv_bytes() 240 size = result.tell() 241 if bytesize < offset + size: 242 raise BufferTooShort(result.getvalue()) 243 # Message can fit in dest 244 result.seek(0) 245 result.readinto(m[offset // itemsize : 246 (offset + size) // itemsize]) 247 return size 248 249 def recv(self): 250 """Receive a (picklable) object""" 251 self._check_closed() 252 self._check_readable() 253 buf = self._recv_bytes() 254 return _ForkingPickler.loads(buf.getbuffer()) 255 256 def poll(self, timeout=0.0): 257 """Whether there is any input available to be read""" 258 self._check_closed() 259 self._check_readable() 260 return self._poll(timeout) 261 262 def __enter__(self): 263 return self 264 265 def __exit__(self, exc_type, exc_value, exc_tb): 266 self.close() 267 268 269if _winapi: 270 271 class PipeConnection(_ConnectionBase): 272 """ 273 Connection class based on a Windows named pipe. 274 Overlapped I/O is used, so the handles must have been created 275 with FILE_FLAG_OVERLAPPED. 276 """ 277 _got_empty_message = False 278 279 def _close(self, _CloseHandle=_winapi.CloseHandle): 280 _CloseHandle(self._handle) 281 282 def _send_bytes(self, buf): 283 ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) 284 try: 285 if err == _winapi.ERROR_IO_PENDING: 286 waitres = _winapi.WaitForMultipleObjects( 287 [ov.event], False, INFINITE) 288 assert waitres == WAIT_OBJECT_0 289 except: 290 ov.cancel() 291 raise 292 finally: 293 nwritten, err = ov.GetOverlappedResult(True) 294 assert err == 0 295 assert nwritten == len(buf) 296 297 def _recv_bytes(self, maxsize=None): 298 if self._got_empty_message: 299 self._got_empty_message = False 300 return io.BytesIO() 301 else: 302 bsize = 128 if maxsize is None else min(maxsize, 128) 303 try: 304 ov, err = _winapi.ReadFile(self._handle, bsize, 305 overlapped=True) 306 try: 307 if err == _winapi.ERROR_IO_PENDING: 308 waitres = _winapi.WaitForMultipleObjects( 309 [ov.event], False, INFINITE) 310 assert waitres == WAIT_OBJECT_0 311 except: 312 ov.cancel() 313 raise 314 finally: 315 nread, err = ov.GetOverlappedResult(True) 316 if err == 0: 317 f = io.BytesIO() 318 f.write(ov.getbuffer()) 319 return f 320 elif err == _winapi.ERROR_MORE_DATA: 321 return self._get_more_data(ov, maxsize) 322 except OSError as e: 323 if e.winerror == _winapi.ERROR_BROKEN_PIPE: 324 raise EOFError 325 else: 326 raise 327 raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") 328 329 def _poll(self, timeout): 330 if (self._got_empty_message or 331 _winapi.PeekNamedPipe(self._handle)[0] != 0): 332 return True 333 return bool(wait([self], timeout)) 334 335 def _get_more_data(self, ov, maxsize): 336 buf = ov.getbuffer() 337 f = io.BytesIO() 338 f.write(buf) 339 left = _winapi.PeekNamedPipe(self._handle)[1] 340 assert left > 0 341 if maxsize is not None and len(buf) + left > maxsize: 342 self._bad_message_length() 343 ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) 344 rbytes, err = ov.GetOverlappedResult(True) 345 assert err == 0 346 assert rbytes == left 347 f.write(ov.getbuffer()) 348 return f 349 350 351class Connection(_ConnectionBase): 352 """ 353 Connection class based on an arbitrary file descriptor (Unix only), or 354 a socket handle (Windows). 355 """ 356 357 if _winapi: 358 def _close(self, _close=_multiprocessing.closesocket): 359 _close(self._handle) 360 _write = _multiprocessing.send 361 _read = _multiprocessing.recv 362 else: 363 def _close(self, _close=os.close): 364 _close(self._handle) 365 _write = os.write 366 _read = os.read 367 368 def _send(self, buf, write=_write): 369 remaining = len(buf) 370 while True: 371 n = write(self._handle, buf) 372 remaining -= n 373 if remaining == 0: 374 break 375 buf = buf[n:] 376 377 def _recv(self, size, read=_read): 378 buf = io.BytesIO() 379 handle = self._handle 380 remaining = size 381 while remaining > 0: 382 chunk = read(handle, remaining) 383 n = len(chunk) 384 if n == 0: 385 if remaining == size: 386 raise EOFError 387 else: 388 raise OSError("got end of file during message") 389 buf.write(chunk) 390 remaining -= n 391 return buf 392 393 def _send_bytes(self, buf): 394 n = len(buf) 395 # For wire compatibility with 3.2 and lower 396 header = struct.pack("!i", n) 397 if n > 16384: 398 # The payload is large so Nagle's algorithm won't be triggered 399 # and we'd better avoid the cost of concatenation. 400 self._send(header) 401 self._send(buf) 402 else: 403 # Issue #20540: concatenate before sending, to avoid delays due 404 # to Nagle's algorithm on a TCP socket. 405 # Also note we want to avoid sending a 0-length buffer separately, 406 # to avoid "broken pipe" errors if the other end closed the pipe. 407 self._send(header + buf) 408 409 def _recv_bytes(self, maxsize=None): 410 buf = self._recv(4) 411 size, = struct.unpack("!i", buf.getvalue()) 412 if maxsize is not None and size > maxsize: 413 return None 414 return self._recv(size) 415 416 def _poll(self, timeout): 417 r = wait([self], timeout) 418 return bool(r) 419 420 421# 422# Public functions 423# 424 425class Listener(object): 426 ''' 427 Returns a listener object. 428 429 This is a wrapper for a bound socket which is 'listening' for 430 connections, or for a Windows named pipe. 431 ''' 432 def __init__(self, address=None, family=None, backlog=1, authkey=None): 433 family = family or (address and address_type(address)) \ 434 or default_family 435 address = address or arbitrary_address(family) 436 437 _validate_family(family) 438 if family == 'AF_PIPE': 439 self._listener = PipeListener(address, backlog) 440 else: 441 self._listener = SocketListener(address, family, backlog) 442 443 if authkey is not None and not isinstance(authkey, bytes): 444 raise TypeError('authkey should be a byte string') 445 446 self._authkey = authkey 447 448 def accept(self): 449 ''' 450 Accept a connection on the bound socket or named pipe of `self`. 451 452 Returns a `Connection` object. 453 ''' 454 if self._listener is None: 455 raise OSError('listener is closed') 456 c = self._listener.accept() 457 if self._authkey: 458 deliver_challenge(c, self._authkey) 459 answer_challenge(c, self._authkey) 460 return c 461 462 def close(self): 463 ''' 464 Close the bound socket or named pipe of `self`. 465 ''' 466 listener = self._listener 467 if listener is not None: 468 self._listener = None 469 listener.close() 470 471 @property 472 def address(self): 473 return self._listener._address 474 475 @property 476 def last_accepted(self): 477 return self._listener._last_accepted 478 479 def __enter__(self): 480 return self 481 482 def __exit__(self, exc_type, exc_value, exc_tb): 483 self.close() 484 485 486def Client(address, family=None, authkey=None): 487 ''' 488 Returns a connection to the address of a `Listener` 489 ''' 490 family = family or address_type(address) 491 _validate_family(family) 492 if family == 'AF_PIPE': 493 c = PipeClient(address) 494 else: 495 c = SocketClient(address) 496 497 if authkey is not None and not isinstance(authkey, bytes): 498 raise TypeError('authkey should be a byte string') 499 500 if authkey is not None: 501 answer_challenge(c, authkey) 502 deliver_challenge(c, authkey) 503 504 return c 505 506 507if sys.platform != 'win32': 508 509 def Pipe(duplex=True): 510 ''' 511 Returns pair of connection objects at either end of a pipe 512 ''' 513 if duplex: 514 s1, s2 = socket.socketpair() 515 s1.setblocking(True) 516 s2.setblocking(True) 517 c1 = Connection(s1.detach()) 518 c2 = Connection(s2.detach()) 519 else: 520 fd1, fd2 = os.pipe() 521 c1 = Connection(fd1, writable=False) 522 c2 = Connection(fd2, readable=False) 523 524 return c1, c2 525 526else: 527 528 def Pipe(duplex=True): 529 ''' 530 Returns pair of connection objects at either end of a pipe 531 ''' 532 address = arbitrary_address('AF_PIPE') 533 if duplex: 534 openmode = _winapi.PIPE_ACCESS_DUPLEX 535 access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE 536 obsize, ibsize = BUFSIZE, BUFSIZE 537 else: 538 openmode = _winapi.PIPE_ACCESS_INBOUND 539 access = _winapi.GENERIC_WRITE 540 obsize, ibsize = 0, BUFSIZE 541 542 h1 = _winapi.CreateNamedPipe( 543 address, openmode | _winapi.FILE_FLAG_OVERLAPPED | 544 _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, 545 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 546 _winapi.PIPE_WAIT, 547 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, 548 # default security descriptor: the handle cannot be inherited 549 _winapi.NULL 550 ) 551 h2 = _winapi.CreateFile( 552 address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, 553 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 554 ) 555 _winapi.SetNamedPipeHandleState( 556 h2, _winapi.PIPE_READMODE_MESSAGE, None, None 557 ) 558 559 overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True) 560 _, err = overlapped.GetOverlappedResult(True) 561 assert err == 0 562 563 c1 = PipeConnection(h1, writable=duplex) 564 c2 = PipeConnection(h2, readable=duplex) 565 566 return c1, c2 567 568# 569# Definitions for connections based on sockets 570# 571 572class SocketListener(object): 573 ''' 574 Representation of a socket which is bound to an address and listening 575 ''' 576 def __init__(self, address, family, backlog=1): 577 self._socket = socket.socket(getattr(socket, family)) 578 try: 579 # SO_REUSEADDR has different semantics on Windows (issue #2550). 580 if os.name == 'posix': 581 self._socket.setsockopt(socket.SOL_SOCKET, 582 socket.SO_REUSEADDR, 1) 583 self._socket.setblocking(True) 584 self._socket.bind(address) 585 self._socket.listen(backlog) 586 self._address = self._socket.getsockname() 587 except OSError: 588 self._socket.close() 589 raise 590 self._family = family 591 self._last_accepted = None 592 593 if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address): 594 # Linux abstract socket namespaces do not need to be explicitly unlinked 595 self._unlink = util.Finalize( 596 self, os.unlink, args=(address,), exitpriority=0 597 ) 598 else: 599 self._unlink = None 600 601 def accept(self): 602 s, self._last_accepted = self._socket.accept() 603 s.setblocking(True) 604 return Connection(s.detach()) 605 606 def close(self): 607 try: 608 self._socket.close() 609 finally: 610 unlink = self._unlink 611 if unlink is not None: 612 self._unlink = None 613 unlink() 614 615 616def SocketClient(address): 617 ''' 618 Return a connection object connected to the socket given by `address` 619 ''' 620 family = address_type(address) 621 with socket.socket( getattr(socket, family) ) as s: 622 s.setblocking(True) 623 s.connect(address) 624 return Connection(s.detach()) 625 626# 627# Definitions for connections based on named pipes 628# 629 630if sys.platform == 'win32': 631 632 class PipeListener(object): 633 ''' 634 Representation of a named pipe 635 ''' 636 def __init__(self, address, backlog=None): 637 self._address = address 638 self._handle_queue = [self._new_handle(first=True)] 639 640 self._last_accepted = None 641 util.sub_debug('listener created with address=%r', self._address) 642 self.close = util.Finalize( 643 self, PipeListener._finalize_pipe_listener, 644 args=(self._handle_queue, self._address), exitpriority=0 645 ) 646 647 def _new_handle(self, first=False): 648 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 649 if first: 650 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 651 return _winapi.CreateNamedPipe( 652 self._address, flags, 653 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 654 _winapi.PIPE_WAIT, 655 _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, 656 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL 657 ) 658 659 def accept(self): 660 self._handle_queue.append(self._new_handle()) 661 handle = self._handle_queue.pop(0) 662 try: 663 ov = _winapi.ConnectNamedPipe(handle, overlapped=True) 664 except OSError as e: 665 if e.winerror != _winapi.ERROR_NO_DATA: 666 raise 667 # ERROR_NO_DATA can occur if a client has already connected, 668 # written data and then disconnected -- see Issue 14725. 669 else: 670 try: 671 res = _winapi.WaitForMultipleObjects( 672 [ov.event], False, INFINITE) 673 except: 674 ov.cancel() 675 _winapi.CloseHandle(handle) 676 raise 677 finally: 678 _, err = ov.GetOverlappedResult(True) 679 assert err == 0 680 return PipeConnection(handle) 681 682 @staticmethod 683 def _finalize_pipe_listener(queue, address): 684 util.sub_debug('closing listener with address=%r', address) 685 for handle in queue: 686 _winapi.CloseHandle(handle) 687 688 def PipeClient(address): 689 ''' 690 Return a connection object connected to the pipe given by `address` 691 ''' 692 t = _init_timeout() 693 while 1: 694 try: 695 _winapi.WaitNamedPipe(address, 1000) 696 h = _winapi.CreateFile( 697 address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE, 698 0, _winapi.NULL, _winapi.OPEN_EXISTING, 699 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 700 ) 701 except OSError as e: 702 if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, 703 _winapi.ERROR_PIPE_BUSY) or _check_timeout(t): 704 raise 705 else: 706 break 707 else: 708 raise 709 710 _winapi.SetNamedPipeHandleState( 711 h, _winapi.PIPE_READMODE_MESSAGE, None, None 712 ) 713 return PipeConnection(h) 714 715# 716# Authentication stuff 717# 718 719MESSAGE_LENGTH = 20 720 721CHALLENGE = b'#CHALLENGE#' 722WELCOME = b'#WELCOME#' 723FAILURE = b'#FAILURE#' 724 725def deliver_challenge(connection, authkey): 726 import hmac 727 if not isinstance(authkey, bytes): 728 raise ValueError( 729 "Authkey must be bytes, not {0!s}".format(type(authkey))) 730 message = os.urandom(MESSAGE_LENGTH) 731 connection.send_bytes(CHALLENGE + message) 732 digest = hmac.new(authkey, message, 'md5').digest() 733 response = connection.recv_bytes(256) # reject large message 734 if response == digest: 735 connection.send_bytes(WELCOME) 736 else: 737 connection.send_bytes(FAILURE) 738 raise AuthenticationError('digest received was wrong') 739 740def answer_challenge(connection, authkey): 741 import hmac 742 if not isinstance(authkey, bytes): 743 raise ValueError( 744 "Authkey must be bytes, not {0!s}".format(type(authkey))) 745 message = connection.recv_bytes(256) # reject large message 746 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message 747 message = message[len(CHALLENGE):] 748 digest = hmac.new(authkey, message, 'md5').digest() 749 connection.send_bytes(digest) 750 response = connection.recv_bytes(256) # reject large message 751 if response != WELCOME: 752 raise AuthenticationError('digest sent was rejected') 753 754# 755# Support for using xmlrpclib for serialization 756# 757 758class ConnectionWrapper(object): 759 def __init__(self, conn, dumps, loads): 760 self._conn = conn 761 self._dumps = dumps 762 self._loads = loads 763 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): 764 obj = getattr(conn, attr) 765 setattr(self, attr, obj) 766 def send(self, obj): 767 s = self._dumps(obj) 768 self._conn.send_bytes(s) 769 def recv(self): 770 s = self._conn.recv_bytes() 771 return self._loads(s) 772 773def _xml_dumps(obj): 774 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') 775 776def _xml_loads(s): 777 (obj,), method = xmlrpclib.loads(s.decode('utf-8')) 778 return obj 779 780class XmlListener(Listener): 781 def accept(self): 782 global xmlrpclib 783 import xmlrpc.client as xmlrpclib 784 obj = Listener.accept(self) 785 return ConnectionWrapper(obj, _xml_dumps, _xml_loads) 786 787def XmlClient(*args, **kwds): 788 global xmlrpclib 789 import xmlrpc.client as xmlrpclib 790 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) 791 792# 793# Wait 794# 795 796if sys.platform == 'win32': 797 798 def _exhaustive_wait(handles, timeout): 799 # Return ALL handles which are currently signalled. (Only 800 # returning the first signalled might create starvation issues.) 801 L = list(handles) 802 ready = [] 803 while L: 804 res = _winapi.WaitForMultipleObjects(L, False, timeout) 805 if res == WAIT_TIMEOUT: 806 break 807 elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): 808 res -= WAIT_OBJECT_0 809 elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L): 810 res -= WAIT_ABANDONED_0 811 else: 812 raise RuntimeError('Should not get here') 813 ready.append(L[res]) 814 L = L[res+1:] 815 timeout = 0 816 return ready 817 818 _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED} 819 820 def wait(object_list, timeout=None): 821 ''' 822 Wait till an object in object_list is ready/readable. 823 824 Returns list of those objects in object_list which are ready/readable. 825 ''' 826 if timeout is None: 827 timeout = INFINITE 828 elif timeout < 0: 829 timeout = 0 830 else: 831 timeout = int(timeout * 1000 + 0.5) 832 833 object_list = list(object_list) 834 waithandle_to_obj = {} 835 ov_list = [] 836 ready_objects = set() 837 ready_handles = set() 838 839 try: 840 for o in object_list: 841 try: 842 fileno = getattr(o, 'fileno') 843 except AttributeError: 844 waithandle_to_obj[o.__index__()] = o 845 else: 846 # start an overlapped read of length zero 847 try: 848 ov, err = _winapi.ReadFile(fileno(), 0, True) 849 except OSError as e: 850 ov, err = None, e.winerror 851 if err not in _ready_errors: 852 raise 853 if err == _winapi.ERROR_IO_PENDING: 854 ov_list.append(ov) 855 waithandle_to_obj[ov.event] = o 856 else: 857 # If o.fileno() is an overlapped pipe handle and 858 # err == 0 then there is a zero length message 859 # in the pipe, but it HAS NOT been consumed... 860 if ov and sys.getwindowsversion()[:2] >= (6, 2): 861 # ... except on Windows 8 and later, where 862 # the message HAS been consumed. 863 try: 864 _, err = ov.GetOverlappedResult(False) 865 except OSError as e: 866 err = e.winerror 867 if not err and hasattr(o, '_got_empty_message'): 868 o._got_empty_message = True 869 ready_objects.add(o) 870 timeout = 0 871 872 ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout) 873 finally: 874 # request that overlapped reads stop 875 for ov in ov_list: 876 ov.cancel() 877 878 # wait for all overlapped reads to stop 879 for ov in ov_list: 880 try: 881 _, err = ov.GetOverlappedResult(True) 882 except OSError as e: 883 err = e.winerror 884 if err not in _ready_errors: 885 raise 886 if err != _winapi.ERROR_OPERATION_ABORTED: 887 o = waithandle_to_obj[ov.event] 888 ready_objects.add(o) 889 if err == 0: 890 # If o.fileno() is an overlapped pipe handle then 891 # a zero length message HAS been consumed. 892 if hasattr(o, '_got_empty_message'): 893 o._got_empty_message = True 894 895 ready_objects.update(waithandle_to_obj[h] for h in ready_handles) 896 return [o for o in object_list if o in ready_objects] 897 898else: 899 900 import selectors 901 902 # poll/select have the advantage of not requiring any extra file 903 # descriptor, contrarily to epoll/kqueue (also, they require a single 904 # syscall). 905 if hasattr(selectors, 'PollSelector'): 906 _WaitSelector = selectors.PollSelector 907 else: 908 _WaitSelector = selectors.SelectSelector 909 910 def wait(object_list, timeout=None): 911 ''' 912 Wait till an object in object_list is ready/readable. 913 914 Returns list of those objects in object_list which are ready/readable. 915 ''' 916 with _WaitSelector() as selector: 917 for obj in object_list: 918 selector.register(obj, selectors.EVENT_READ) 919 920 if timeout is not None: 921 deadline = getattr(time,'monotonic',time.time)() + timeout 922 923 while True: 924 ready = selector.select(timeout) 925 if ready: 926 return [key.fileobj for (key, events) in ready] 927 else: 928 if timeout is not None: 929 timeout = deadline - getattr(time,'monotonic',time.time)() 930 if timeout < 0: 931 return ready 932 933# 934# Make connection and socket objects sharable if possible 935# 936 937if sys.platform == 'win32': 938 def reduce_connection(conn): 939 handle = conn.fileno() 940 with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: 941 from . import resource_sharer 942 ds = resource_sharer.DupSocket(s) 943 return rebuild_connection, (ds, conn.readable, conn.writable) 944 def rebuild_connection(ds, readable, writable): 945 sock = ds.detach() 946 return Connection(sock.detach(), readable, writable) 947 reduction.register(Connection, reduce_connection) 948 949 def reduce_pipe_connection(conn): 950 access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | 951 (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) 952 dh = reduction.DupHandle(conn.fileno(), access) 953 return rebuild_pipe_connection, (dh, conn.readable, conn.writable) 954 def rebuild_pipe_connection(dh, readable, writable): 955 handle = dh.detach() 956 return PipeConnection(handle, readable, writable) 957 reduction.register(PipeConnection, reduce_pipe_connection) 958 959else: 960 def reduce_connection(conn): 961 df = reduction.DupFd(conn.fileno()) 962 return rebuild_connection, (df, conn.readable, conn.writable) 963 def rebuild_connection(df, readable, writable): 964 fd = df.detach() 965 return Connection(fd, readable, writable) 966 reduction.register(Connection, reduce_connection) 967