1# 2# A higher level module for using sockets (or Windows named pipes) 3# 4# multiprocessing/connection.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] 11 12import io 13import os 14import sys 15import socket 16import struct 17import time 18import tempfile 19import itertools 20 21try: 22 import _multiprocess as _multiprocessing 23except ImportError: 24 import _multiprocessing 25 26from . import reduction 27from . import util 28 29from . import AuthenticationError, BufferTooShort 30from .reduction import ForkingPickler 31 32try: 33 import _winapi 34 from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE 35except ImportError: 36 if sys.platform == 'win32': 37 raise 38 _winapi = None 39 40# 41# 42# 43 44BUFSIZE = 8192 45# A very generous timeout when it comes to local connections... 46CONNECTION_TIMEOUT = 20. 47 48_mmap_counter = itertools.count() 49 50default_family = 'AF_INET' 51families = ['AF_INET'] 52 53if hasattr(socket, 'AF_UNIX'): 54 default_family = 'AF_UNIX' 55 families += ['AF_UNIX'] 56 57if sys.platform == 'win32': 58 default_family = 'AF_PIPE' 59 families += ['AF_PIPE'] 60 61 62def _init_timeout(timeout=CONNECTION_TIMEOUT): 63 return time.time() + timeout 64 65def _check_timeout(t): 66 return time.time() > t 67 68# 69# 70# 71 72def arbitrary_address(family): 73 ''' 74 Return an arbitrary free address for the given family 75 ''' 76 if family == 'AF_INET': 77 return ('localhost', 0) 78 elif family == 'AF_UNIX': 79 return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir()) 80 elif family == 'AF_PIPE': 81 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % 82 (os.getpid(), next(_mmap_counter)), dir="") 83 else: 84 raise ValueError('unrecognized family') 85 86def _validate_family(family): 87 ''' 88 Checks if the family is valid for the current environment. 89 ''' 90 if sys.platform != 'win32' and family == 'AF_PIPE': 91 raise ValueError('Family %s is not recognized.' % family) 92 93 if sys.platform == 'win32' and family == 'AF_UNIX': 94 # double check 95 if not hasattr(socket, family): 96 raise ValueError('Family %s is not recognized.' % family) 97 98def address_type(address): 99 ''' 100 Return the types of the address 101 102 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' 103 ''' 104 if type(address) == tuple: 105 return 'AF_INET' 106 elif type(address) is str and address.startswith('\\\\'): 107 return 'AF_PIPE' 108 elif type(address) is str: 109 return 'AF_UNIX' 110 else: 111 raise ValueError('address type of %r unrecognized' % address) 112 113# 114# Connection classes 115# 116 117class _ConnectionBase: 118 _handle = None 119 120 def __init__(self, handle, readable=True, writable=True): 121 handle = handle.__index__() 122 if handle < 0: 123 raise ValueError("invalid handle") 124 if not readable and not writable: 125 raise ValueError( 126 "at least one of `readable` and `writable` must be True") 127 self._handle = handle 128 self._readable = readable 129 self._writable = writable 130 131 # XXX should we use util.Finalize instead of a __del__? 132 133 def __del__(self): 134 if self._handle is not None: 135 self._close() 136 137 def _check_closed(self): 138 if self._handle is None: 139 raise OSError("handle is closed") 140 141 def _check_readable(self): 142 if not self._readable: 143 raise OSError("connection is write-only") 144 145 def _check_writable(self): 146 if not self._writable: 147 raise OSError("connection is read-only") 148 149 def _bad_message_length(self): 150 if self._writable: 151 self._readable = False 152 else: 153 self.close() 154 raise OSError("bad message length") 155 156 @property 157 def closed(self): 158 """True if the connection is closed""" 159 return self._handle is None 160 161 @property 162 def readable(self): 163 """True if the connection is readable""" 164 return self._readable 165 166 @property 167 def writable(self): 168 """True if the connection is writable""" 169 return self._writable 170 171 def fileno(self): 172 """File descriptor or handle of the connection""" 173 self._check_closed() 174 return self._handle 175 176 def close(self): 177 """Close the connection""" 178 if self._handle is not None: 179 try: 180 self._close() 181 finally: 182 self._handle = None 183 184 def send_bytes(self, buf, offset=0, size=None): 185 """Send the bytes data from a bytes-like object""" 186 self._check_closed() 187 self._check_writable() 188 m = memoryview(buf) 189 # HACK for byte-indexing of non-bytewise buffers (e.g. array.array) 190 if m.itemsize > 1: 191 m = memoryview(bytes(m)) 192 n = len(m) 193 if offset < 0: 194 raise ValueError("offset is negative") 195 if n < offset: 196 raise ValueError("buffer length < offset") 197 if size is None: 198 size = n - offset 199 elif size < 0: 200 raise ValueError("size is negative") 201 elif offset + size > n: 202 raise ValueError("buffer length < offset + size") 203 self._send_bytes(m[offset:offset + size]) 204 205 def send(self, obj): 206 """Send a (picklable) object""" 207 self._check_closed() 208 self._check_writable() 209 self._send_bytes(ForkingPickler.dumps(obj)) 210 211 def recv_bytes(self, maxlength=None): 212 """ 213 Receive bytes data as a bytes object. 214 """ 215 self._check_closed() 216 self._check_readable() 217 if maxlength is not None and maxlength < 0: 218 raise ValueError("negative maxlength") 219 buf = self._recv_bytes(maxlength) 220 if buf is None: 221 self._bad_message_length() 222 return buf.getvalue() 223 224 def recv_bytes_into(self, buf, offset=0): 225 """ 226 Receive bytes data into a writeable bytes-like object. 227 Return the number of bytes read. 228 """ 229 self._check_closed() 230 self._check_readable() 231 with memoryview(buf) as m: 232 # Get bytesize of arbitrary buffer 233 itemsize = m.itemsize 234 bytesize = itemsize * len(m) 235 if offset < 0: 236 raise ValueError("negative offset") 237 elif offset > bytesize: 238 raise ValueError("offset too large") 239 result = self._recv_bytes() 240 size = result.tell() 241 if bytesize < offset + size: 242 raise BufferTooShort(result.getvalue()) 243 # Message can fit in dest 244 result.seek(0) 245 result.readinto(m[offset // itemsize : 246 (offset + size) // itemsize]) 247 return size 248 249 def recv(self): 250 """Receive a (picklable) object""" 251 self._check_closed() 252 self._check_readable() 253 buf = self._recv_bytes() 254 return ForkingPickler.loads(buf.getbuffer()) 255 256 def poll(self, timeout=0.0): 257 """Whether there is any input available to be read""" 258 self._check_closed() 259 self._check_readable() 260 return self._poll(timeout) 261 262 def __enter__(self): 263 return self 264 265 def __exit__(self, exc_type, exc_value, exc_tb): 266 self.close() 267 268 269if _winapi: 270 271 class PipeConnection(_ConnectionBase): 272 """ 273 Connection class based on a Windows named pipe. 274 Overlapped I/O is used, so the handles must have been created 275 with FILE_FLAG_OVERLAPPED. 276 """ 277 _got_empty_message = False 278 279 def _close(self, _CloseHandle=_winapi.CloseHandle): 280 _CloseHandle(self._handle) 281 282 def _send_bytes(self, buf): 283 ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) 284 try: 285 if err == _winapi.ERROR_IO_PENDING: 286 waitres = _winapi.WaitForMultipleObjects( 287 [ov.event], False, INFINITE) 288 assert waitres == WAIT_OBJECT_0 289 except: 290 ov.cancel() 291 raise 292 finally: 293 nwritten, err = ov.GetOverlappedResult(True) 294 assert err == 0 295 assert nwritten == len(buf) 296 297 def _recv_bytes(self, maxsize=None): 298 if self._got_empty_message: 299 self._got_empty_message = False 300 return io.BytesIO() 301 else: 302 bsize = 128 if maxsize is None else min(maxsize, 128) 303 try: 304 ov, err = _winapi.ReadFile(self._handle, bsize, 305 overlapped=True) 306 try: 307 if err == _winapi.ERROR_IO_PENDING: 308 waitres = _winapi.WaitForMultipleObjects( 309 [ov.event], False, INFINITE) 310 assert waitres == WAIT_OBJECT_0 311 except: 312 ov.cancel() 313 raise 314 finally: 315 nread, err = ov.GetOverlappedResult(True) 316 if err == 0: 317 f = io.BytesIO() 318 f.write(ov.getbuffer()) 319 return f 320 elif err == _winapi.ERROR_MORE_DATA: 321 return self._get_more_data(ov, maxsize) 322 except OSError as e: 323 if e.winerror == _winapi.ERROR_BROKEN_PIPE: 324 raise EOFError 325 else: 326 raise 327 raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") 328 329 def _poll(self, timeout): 330 if (self._got_empty_message or 331 _winapi.PeekNamedPipe(self._handle)[0] != 0): 332 return True 333 return bool(wait([self], timeout)) 334 335 def _get_more_data(self, ov, maxsize): 336 buf = ov.getbuffer() 337 f = io.BytesIO() 338 f.write(buf) 339 left = _winapi.PeekNamedPipe(self._handle)[1] 340 assert left > 0 341 if maxsize is not None and len(buf) + left > maxsize: 342 self._bad_message_length() 343 ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) 344 rbytes, err = ov.GetOverlappedResult(True) 345 assert err == 0 346 assert rbytes == left 347 f.write(ov.getbuffer()) 348 return f 349 350 351class Connection(_ConnectionBase): 352 """ 353 Connection class based on an arbitrary file descriptor (Unix only), or 354 a socket handle (Windows). 355 """ 356 357 if _winapi: 358 def _close(self, _close=_multiprocessing.closesocket): 359 _close(self._handle) 360 _write = _multiprocessing.send 361 _read = _multiprocessing.recv 362 else: 363 def _close(self, _close=os.close): 364 _close(self._handle) 365 _write = os.write 366 _read = os.read 367 368 def _send(self, buf, write=_write): 369 remaining = len(buf) 370 while True: 371 try: 372 n = write(self._handle, buf) 373 except InterruptedError: 374 continue 375 remaining -= n 376 if remaining == 0: 377 break 378 buf = buf[n:] 379 380 def _recv(self, size, read=_read): 381 buf = io.BytesIO() 382 handle = self._handle 383 remaining = size 384 while remaining > 0: 385 try: 386 chunk = read(handle, remaining) 387 except InterruptedError: 388 continue 389 n = len(chunk) 390 if n == 0: 391 if remaining == size: 392 raise EOFError 393 else: 394 raise OSError("got end of file during message") 395 buf.write(chunk) 396 remaining -= n 397 return buf 398 399 def _send_bytes(self, buf): 400 n = len(buf) 401 # For wire compatibility with 3.2 and lower 402 header = struct.pack("!i", n) 403 if n > 16384: 404 # The payload is large so Nagle's algorithm won't be triggered 405 # and we'd better avoid the cost of concatenation. 406 chunks = [header, buf] 407 elif n > 0: 408 # Issue # 20540: concatenate before sending, to avoid delays due 409 # to Nagle's algorithm on a TCP socket. 410 chunks = [header + buf] 411 else: 412 # This code path is necessary to avoid "broken pipe" errors 413 # when sending a 0-length buffer if the other end closed the pipe. 414 chunks = [header] 415 for chunk in chunks: 416 self._send(chunk) 417 418 def _recv_bytes(self, maxsize=None): 419 buf = self._recv(4) 420 size, = struct.unpack("!i", buf.getvalue()) 421 if maxsize is not None and size > maxsize: 422 return None 423 return self._recv(size) 424 425 def _poll(self, timeout): 426 r = wait([self], timeout) 427 return bool(r) 428 429 430# 431# Public functions 432# 433 434class Listener(object): 435 ''' 436 Returns a listener object. 437 438 This is a wrapper for a bound socket which is 'listening' for 439 connections, or for a Windows named pipe. 440 ''' 441 def __init__(self, address=None, family=None, backlog=1, authkey=None): 442 family = family or (address and address_type(address)) \ 443 or default_family 444 address = address or arbitrary_address(family) 445 446 _validate_family(family) 447 if family == 'AF_PIPE': 448 self._listener = PipeListener(address, backlog) 449 else: 450 self._listener = SocketListener(address, family, backlog) 451 452 if authkey is not None and not isinstance(authkey, bytes): 453 raise TypeError('authkey should be a byte string') 454 455 self._authkey = authkey 456 457 def accept(self): 458 ''' 459 Accept a connection on the bound socket or named pipe of `self`. 460 461 Returns a `Connection` object. 462 ''' 463 if self._listener is None: 464 raise OSError('listener is closed') 465 c = self._listener.accept() 466 if self._authkey: 467 deliver_challenge(c, self._authkey) 468 answer_challenge(c, self._authkey) 469 return c 470 471 def close(self): 472 ''' 473 Close the bound socket or named pipe of `self`. 474 ''' 475 listener = self._listener 476 if listener is not None: 477 self._listener = None 478 listener.close() 479 480 address = property(lambda self: self._listener._address) 481 last_accepted = property(lambda self: self._listener._last_accepted) 482 483 def __enter__(self): 484 return self 485 486 def __exit__(self, exc_type, exc_value, exc_tb): 487 self.close() 488 489 490def Client(address, family=None, authkey=None): 491 ''' 492 Returns a connection to the address of a `Listener` 493 ''' 494 family = family or address_type(address) 495 _validate_family(family) 496 if family == 'AF_PIPE': 497 c = PipeClient(address) 498 else: 499 c = SocketClient(address) 500 501 if authkey is not None and not isinstance(authkey, bytes): 502 raise TypeError('authkey should be a byte string') 503 504 if authkey is not None: 505 answer_challenge(c, authkey) 506 deliver_challenge(c, authkey) 507 508 return c 509 510 511if sys.platform != 'win32': 512 513 def Pipe(duplex=True): 514 ''' 515 Returns pair of connection objects at either end of a pipe 516 ''' 517 if duplex: 518 s1, s2 = socket.socketpair() 519 s1.setblocking(True) 520 s2.setblocking(True) 521 c1 = Connection(s1.detach()) 522 c2 = Connection(s2.detach()) 523 else: 524 fd1, fd2 = os.pipe() 525 c1 = Connection(fd1, writable=False) 526 c2 = Connection(fd2, readable=False) 527 528 return c1, c2 529 530else: 531 532 def Pipe(duplex=True): 533 ''' 534 Returns pair of connection objects at either end of a pipe 535 ''' 536 address = arbitrary_address('AF_PIPE') 537 if duplex: 538 openmode = _winapi.PIPE_ACCESS_DUPLEX 539 access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE 540 obsize, ibsize = BUFSIZE, BUFSIZE 541 else: 542 openmode = _winapi.PIPE_ACCESS_INBOUND 543 access = _winapi.GENERIC_WRITE 544 obsize, ibsize = 0, BUFSIZE 545 546 h1 = _winapi.CreateNamedPipe( 547 address, openmode | _winapi.FILE_FLAG_OVERLAPPED | 548 _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, 549 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 550 _winapi.PIPE_WAIT, 551 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, 552 # default security descriptor: the handle cannot be inherited 553 _winapi.NULL 554 ) 555 h2 = _winapi.CreateFile( 556 address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, 557 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 558 ) 559 _winapi.SetNamedPipeHandleState( 560 h2, _winapi.PIPE_READMODE_MESSAGE, None, None 561 ) 562 563 overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True) 564 _, err = overlapped.GetOverlappedResult(True) 565 assert err == 0 566 567 c1 = PipeConnection(h1, writable=duplex) 568 c2 = PipeConnection(h2, readable=duplex) 569 570 return c1, c2 571 572# 573# Definitions for connections based on sockets 574# 575 576class SocketListener(object): 577 ''' 578 Representation of a socket which is bound to an address and listening 579 ''' 580 def __init__(self, address, family, backlog=1): 581 self._socket = socket.socket(getattr(socket, family)) 582 try: 583 # SO_REUSEADDR has different semantics on Windows (issue #2550). 584 if os.name == 'posix': 585 self._socket.setsockopt(socket.SOL_SOCKET, 586 socket.SO_REUSEADDR, 1) 587 self._socket.setblocking(True) 588 self._socket.bind(address) 589 self._socket.listen(backlog) 590 self._address = self._socket.getsockname() 591 except OSError: 592 self._socket.close() 593 raise 594 self._family = family 595 self._last_accepted = None 596 597 if family == 'AF_UNIX': 598 self._unlink = util.Finalize( 599 self, os.unlink, args=(address,), exitpriority=0 600 ) 601 else: 602 self._unlink = None 603 604 def accept(self): 605 while True: 606 try: 607 s, self._last_accepted = self._socket.accept() 608 except InterruptedError: 609 pass 610 else: 611 break 612 s.setblocking(True) 613 return Connection(s.detach()) 614 615 def close(self): 616 try: 617 self._socket.close() 618 finally: 619 unlink = self._unlink 620 if unlink is not None: 621 self._unlink = None 622 unlink() 623 624 625def SocketClient(address): 626 ''' 627 Return a connection object connected to the socket given by `address` 628 ''' 629 family = address_type(address) 630 with socket.socket( getattr(socket, family) ) as s: 631 s.setblocking(True) 632 s.connect(address) 633 return Connection(s.detach()) 634 635# 636# Definitions for connections based on named pipes 637# 638 639if sys.platform == 'win32': 640 641 class PipeListener(object): 642 ''' 643 Representation of a named pipe 644 ''' 645 def __init__(self, address, backlog=None): 646 self._address = address 647 self._handle_queue = [self._new_handle(first=True)] 648 649 self._last_accepted = None 650 util.sub_debug('listener created with address=%r', self._address) 651 self.close = util.Finalize( 652 self, PipeListener._finalize_pipe_listener, 653 args=(self._handle_queue, self._address), exitpriority=0 654 ) 655 656 def _new_handle(self, first=False): 657 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 658 if first: 659 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 660 return _winapi.CreateNamedPipe( 661 self._address, flags, 662 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 663 _winapi.PIPE_WAIT, 664 _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, 665 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL 666 ) 667 668 def accept(self): 669 self._handle_queue.append(self._new_handle()) 670 handle = self._handle_queue.pop(0) 671 try: 672 ov = _winapi.ConnectNamedPipe(handle, overlapped=True) 673 except OSError as e: 674 if e.winerror != _winapi.ERROR_NO_DATA: 675 raise 676 # ERROR_NO_DATA can occur if a client has already connected, 677 # written data and then disconnected -- see Issue 14725. 678 else: 679 try: 680 res = _winapi.WaitForMultipleObjects( 681 [ov.event], False, INFINITE) 682 except: 683 ov.cancel() 684 _winapi.CloseHandle(handle) 685 raise 686 finally: 687 _, err = ov.GetOverlappedResult(True) 688 assert err == 0 689 return PipeConnection(handle) 690 691 @staticmethod 692 def _finalize_pipe_listener(queue, address): 693 util.sub_debug('closing listener with address=%r', address) 694 for handle in queue: 695 _winapi.CloseHandle(handle) 696 697 def PipeClient(address): 698 ''' 699 Return a connection object connected to the pipe given by `address` 700 ''' 701 t = _init_timeout() 702 while 1: 703 try: 704 _winapi.WaitNamedPipe(address, 1000) 705 h = _winapi.CreateFile( 706 address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE, 707 0, _winapi.NULL, _winapi.OPEN_EXISTING, 708 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 709 ) 710 except OSError as e: 711 if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, 712 _winapi.ERROR_PIPE_BUSY) or _check_timeout(t): 713 raise 714 else: 715 break 716 else: 717 raise 718 719 _winapi.SetNamedPipeHandleState( 720 h, _winapi.PIPE_READMODE_MESSAGE, None, None 721 ) 722 return PipeConnection(h) 723 724# 725# Authentication stuff 726# 727 728MESSAGE_LENGTH = 20 729 730CHALLENGE = b'#CHALLENGE#' 731WELCOME = b'#WELCOME#' 732FAILURE = b'#FAILURE#' 733 734def deliver_challenge(connection, authkey): 735 import hmac 736 assert isinstance(authkey, bytes) 737 message = os.urandom(MESSAGE_LENGTH) 738 connection.send_bytes(CHALLENGE + message) 739 digest = hmac.new(authkey, message, 'md5').digest() 740 response = connection.recv_bytes(256) # reject large message 741 if response == digest: 742 connection.send_bytes(WELCOME) 743 else: 744 connection.send_bytes(FAILURE) 745 raise AuthenticationError('digest received was wrong') 746 747def answer_challenge(connection, authkey): 748 import hmac 749 assert isinstance(authkey, bytes) 750 message = connection.recv_bytes(256) # reject large message 751 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message 752 message = message[len(CHALLENGE):] 753 digest = hmac.new(authkey, message, 'md5').digest() 754 connection.send_bytes(digest) 755 response = connection.recv_bytes(256) # reject large message 756 if response != WELCOME: 757 raise AuthenticationError('digest sent was rejected') 758 759# 760# Support for using xmlrpclib for serialization 761# 762 763class ConnectionWrapper(object): 764 def __init__(self, conn, dumps, loads): 765 self._conn = conn 766 self._dumps = dumps 767 self._loads = loads 768 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): 769 obj = getattr(conn, attr) 770 setattr(self, attr, obj) 771 def send(self, obj): 772 s = self._dumps(obj) 773 self._conn.send_bytes(s) 774 def recv(self): 775 s = self._conn.recv_bytes() 776 return self._loads(s) 777 778def _xml_dumps(obj): 779 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') 780 781def _xml_loads(s): 782 (obj,), method = xmlrpclib.loads(s.decode('utf-8')) 783 return obj 784 785class XmlListener(Listener): 786 def accept(self): 787 global xmlrpclib 788 import xmlrpc.client as xmlrpclib 789 obj = Listener.accept(self) 790 return ConnectionWrapper(obj, _xml_dumps, _xml_loads) 791 792def XmlClient(*args, **kwds): 793 global xmlrpclib 794 import xmlrpc.client as xmlrpclib 795 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) 796 797# 798# Wait 799# 800 801if sys.platform == 'win32': 802 803 def _exhaustive_wait(handles, timeout): 804 # Return ALL handles which are currently signalled. (Only 805 # returning the first signalled might create starvation issues.) 806 L = list(handles) 807 ready = [] 808 while L: 809 res = _winapi.WaitForMultipleObjects(L, False, timeout) 810 if res == WAIT_TIMEOUT: 811 break 812 elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): 813 res -= WAIT_OBJECT_0 814 elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L): 815 res -= WAIT_ABANDONED_0 816 else: 817 raise RuntimeError('Should not get here') 818 ready.append(L[res]) 819 L = L[res+1:] 820 timeout = 0 821 return ready 822 823 _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED} 824 825 def wait(object_list, timeout=None): 826 ''' 827 Wait till an object in object_list is ready/readable. 828 829 Returns list of those objects in object_list which are ready/readable. 830 ''' 831 if timeout is None: 832 timeout = INFINITE 833 elif timeout < 0: 834 timeout = 0 835 else: 836 timeout = int(timeout * 1000 + 0.5) 837 838 object_list = list(object_list) 839 waithandle_to_obj = {} 840 ov_list = [] 841 ready_objects = set() 842 ready_handles = set() 843 844 try: 845 for o in object_list: 846 try: 847 fileno = getattr(o, 'fileno') 848 except AttributeError: 849 waithandle_to_obj[o.__index__()] = o 850 else: 851 # start an overlapped read of length zero 852 try: 853 ov, err = _winapi.ReadFile(fileno(), 0, True) 854 except OSError as e: 855 ov, err = None, e.winerror 856 if err not in _ready_errors: 857 raise 858 if err == _winapi.ERROR_IO_PENDING: 859 ov_list.append(ov) 860 waithandle_to_obj[ov.event] = o 861 else: 862 # If o.fileno() is an overlapped pipe handle and 863 # err == 0 then there is a zero length message 864 # in the pipe, but it HAS NOT been consumed... 865 if ov and sys.getwindowsversion()[:2] >= (6, 2): 866 # ... except on Windows 8 and later, where 867 # the message HAS been consumed. 868 try: 869 _, err = ov.GetOverlappedResult(False) 870 except OSError as e: 871 err = e.winerror 872 if not err and hasattr(o, '_got_empty_message'): 873 o._got_empty_message = True 874 ready_objects.add(o) 875 timeout = 0 876 877 ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout) 878 finally: 879 # request that overlapped reads stop 880 for ov in ov_list: 881 ov.cancel() 882 883 # wait for all overlapped reads to stop 884 for ov in ov_list: 885 try: 886 _, err = ov.GetOverlappedResult(True) 887 except OSError as e: 888 err = e.winerror 889 if err not in _ready_errors: 890 raise 891 if err != _winapi.ERROR_OPERATION_ABORTED: 892 o = waithandle_to_obj[ov.event] 893 ready_objects.add(o) 894 if err == 0: 895 # If o.fileno() is an overlapped pipe handle then 896 # a zero length message HAS been consumed. 897 if hasattr(o, '_got_empty_message'): 898 o._got_empty_message = True 899 900 ready_objects.update(waithandle_to_obj[h] for h in ready_handles) 901 return [o for o in object_list if o in ready_objects] 902 903else: 904 905 import selectors 906 907 # poll/select have the advantage of not requiring any extra file 908 # descriptor, contrarily to epoll/kqueue (also, they require a single 909 # syscall). 910 if hasattr(selectors, 'PollSelector'): 911 _WaitSelector = selectors.PollSelector 912 else: 913 _WaitSelector = selectors.SelectSelector 914 915 def wait(object_list, timeout=None): 916 ''' 917 Wait till an object in object_list is ready/readable. 918 919 Returns list of those objects in object_list which are ready/readable. 920 ''' 921 with _WaitSelector() as selector: 922 for obj in object_list: 923 selector.register(obj, selectors.EVENT_READ) 924 925 if timeout is not None: 926 deadline = time.time() + timeout 927 928 while True: 929 ready = selector.select(timeout) 930 if ready: 931 return [key.fileobj for (key, events) in ready] 932 else: 933 if timeout is not None: 934 timeout = deadline - time.time() 935 if timeout < 0: 936 return ready 937 938# 939# Make connection and socket objects sharable if possible 940# 941 942if sys.platform == 'win32': 943 def reduce_connection(conn): 944 handle = conn.fileno() 945 with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: 946 from . import resource_sharer 947 ds = resource_sharer.DupSocket(s) 948 return rebuild_connection, (ds, conn.readable, conn.writable) 949 def rebuild_connection(ds, readable, writable): 950 sock = ds.detach() 951 return Connection(sock.detach(), readable, writable) 952 reduction.register(Connection, reduce_connection) 953 954 def reduce_pipe_connection(conn): 955 access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | 956 (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) 957 dh = reduction.DupHandle(conn.fileno(), access) 958 return rebuild_pipe_connection, (dh, conn.readable, conn.writable) 959 def rebuild_pipe_connection(dh, readable, writable): 960 handle = dh.detach() 961 return PipeConnection(handle, readable, writable) 962 reduction.register(PipeConnection, reduce_pipe_connection) 963 964else: 965 def reduce_connection(conn): 966 df = reduction.DupFd(conn.fileno()) 967 return rebuild_connection, (df, conn.readable, conn.writable) 968 def rebuild_connection(df, readable, writable): 969 fd = df.detach() 970 return Connection(fd, readable, writable) 971 reduction.register(Connection, reduce_connection) 972