1# 2# A higher level module for using sockets (or Windows named pipes) 3# 4# multiprocessing/connection.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] 11 12import io 13import os 14import sys 15import socket 16import struct 17import time 18import tempfile 19import itertools 20 21try: 22 import _multiprocess as _multiprocessing 23except ImportError: 24 import _multiprocessing 25 26from . import util 27 28from . import AuthenticationError, BufferTooShort 29from .context import reduction 30_ForkingPickler = reduction.ForkingPickler 31 32try: 33 import _winapi 34 from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE 35except ImportError: 36 if sys.platform == 'win32': 37 raise 38 _winapi = None 39 40# 41# 42# 43 44BUFSIZE = 8192 45# A very generous timeout when it comes to local connections... 46CONNECTION_TIMEOUT = 20. 47 48_mmap_counter = itertools.count() 49 50default_family = 'AF_INET' 51families = ['AF_INET'] 52 53if hasattr(socket, 'AF_UNIX'): 54 default_family = 'AF_UNIX' 55 families += ['AF_UNIX'] 56 57if sys.platform == 'win32': 58 default_family = 'AF_PIPE' 59 families += ['AF_PIPE'] 60 61 62def _init_timeout(timeout=CONNECTION_TIMEOUT): 63 return getattr(time,'monotonic',time.time)() + timeout 64 65def _check_timeout(t): 66 return getattr(time, 'monotonic',time.time)() > t 67 68# 69# 70# 71 72def arbitrary_address(family): 73 ''' 74 Return an arbitrary free address for the given family 75 ''' 76 if family == 'AF_INET': 77 return ('localhost', 0) 78 elif family == 'AF_UNIX': 79 return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir()) 80 elif family == 'AF_PIPE': 81 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % 82 (os.getpid(), next(_mmap_counter)), dir="") 83 else: 84 raise ValueError('unrecognized family') 85 86def _validate_family(family): 87 ''' 88 Checks if the family is valid for the current environment. 89 ''' 90 if sys.platform != 'win32' and family == 'AF_PIPE': 91 raise ValueError('Family %s is not recognized.' % family) 92 93 if sys.platform == 'win32' and family == 'AF_UNIX': 94 # double check 95 if not hasattr(socket, family): 96 raise ValueError('Family %s is not recognized.' % family) 97 98def address_type(address): 99 ''' 100 Return the types of the address 101 102 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' 103 ''' 104 if type(address) == tuple: 105 return 'AF_INET' 106 elif type(address) is str and address.startswith('\\\\'): 107 return 'AF_PIPE' 108 elif type(address) is str: 109 return 'AF_UNIX' 110 else: 111 raise ValueError('address type of %r unrecognized' % address) 112 113# 114# Connection classes 115# 116 117class _ConnectionBase: 118 _handle = None 119 120 def __init__(self, handle, readable=True, writable=True): 121 handle = handle.__index__() 122 if handle < 0: 123 raise ValueError("invalid handle") 124 if not readable and not writable: 125 raise ValueError( 126 "at least one of `readable` and `writable` must be True") 127 self._handle = handle 128 self._readable = readable 129 self._writable = writable 130 131 # XXX should we use util.Finalize instead of a __del__? 132 133 def __del__(self): 134 if self._handle is not None: 135 self._close() 136 137 def _check_closed(self): 138 if self._handle is None: 139 raise OSError("handle is closed") 140 141 def _check_readable(self): 142 if not self._readable: 143 raise OSError("connection is write-only") 144 145 def _check_writable(self): 146 if not self._writable: 147 raise OSError("connection is read-only") 148 149 def _bad_message_length(self): 150 if self._writable: 151 self._readable = False 152 else: 153 self.close() 154 raise OSError("bad message length") 155 156 @property 157 def closed(self): 158 """True if the connection is closed""" 159 return self._handle is None 160 161 @property 162 def readable(self): 163 """True if the connection is readable""" 164 return self._readable 165 166 @property 167 def writable(self): 168 """True if the connection is writable""" 169 return self._writable 170 171 def fileno(self): 172 """File descriptor or handle of the connection""" 173 self._check_closed() 174 return self._handle 175 176 def close(self): 177 """Close the connection""" 178 if self._handle is not None: 179 try: 180 self._close() 181 finally: 182 self._handle = None 183 184 def send_bytes(self, buf, offset=0, size=None): 185 """Send the bytes data from a bytes-like object""" 186 self._check_closed() 187 self._check_writable() 188 m = memoryview(buf) 189 # HACK for byte-indexing of non-bytewise buffers (e.g. array.array) 190 if m.itemsize > 1: 191 m = memoryview(bytes(m)) 192 n = len(m) 193 if offset < 0: 194 raise ValueError("offset is negative") 195 if n < offset: 196 raise ValueError("buffer length < offset") 197 if size is None: 198 size = n - offset 199 elif size < 0: 200 raise ValueError("size is negative") 201 elif offset + size > n: 202 raise ValueError("buffer length < offset + size") 203 self._send_bytes(m[offset:offset + size]) 204 205 def send(self, obj): 206 """Send a (picklable) object""" 207 self._check_closed() 208 self._check_writable() 209 self._send_bytes(_ForkingPickler.dumps(obj)) 210 211 def recv_bytes(self, maxlength=None): 212 """ 213 Receive bytes data as a bytes object. 214 """ 215 self._check_closed() 216 self._check_readable() 217 if maxlength is not None and maxlength < 0: 218 raise ValueError("negative maxlength") 219 buf = self._recv_bytes(maxlength) 220 if buf is None: 221 self._bad_message_length() 222 return buf.getvalue() 223 224 def recv_bytes_into(self, buf, offset=0): 225 """ 226 Receive bytes data into a writeable bytes-like object. 227 Return the number of bytes read. 228 """ 229 self._check_closed() 230 self._check_readable() 231 with memoryview(buf) as m: 232 # Get bytesize of arbitrary buffer 233 itemsize = m.itemsize 234 bytesize = itemsize * len(m) 235 if offset < 0: 236 raise ValueError("negative offset") 237 elif offset > bytesize: 238 raise ValueError("offset too large") 239 result = self._recv_bytes() 240 size = result.tell() 241 if bytesize < offset + size: 242 raise BufferTooShort(result.getvalue()) 243 # Message can fit in dest 244 result.seek(0) 245 result.readinto(m[offset // itemsize : 246 (offset + size) // itemsize]) 247 return size 248 249 def recv(self): 250 """Receive a (picklable) object""" 251 self._check_closed() 252 self._check_readable() 253 buf = self._recv_bytes() 254 return _ForkingPickler.loads(buf.getbuffer()) 255 256 def poll(self, timeout=0.0): 257 """Whether there is any input available to be read""" 258 self._check_closed() 259 self._check_readable() 260 return self._poll(timeout) 261 262 def __enter__(self): 263 return self 264 265 def __exit__(self, exc_type, exc_value, exc_tb): 266 self.close() 267 268 269if _winapi: 270 271 class PipeConnection(_ConnectionBase): 272 """ 273 Connection class based on a Windows named pipe. 274 Overlapped I/O is used, so the handles must have been created 275 with FILE_FLAG_OVERLAPPED. 276 """ 277 _got_empty_message = False 278 279 def _close(self, _CloseHandle=_winapi.CloseHandle): 280 _CloseHandle(self._handle) 281 282 def _send_bytes(self, buf): 283 ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) 284 try: 285 if err == _winapi.ERROR_IO_PENDING: 286 waitres = _winapi.WaitForMultipleObjects( 287 [ov.event], False, INFINITE) 288 assert waitres == WAIT_OBJECT_0 289 except: 290 ov.cancel() 291 raise 292 finally: 293 nwritten, err = ov.GetOverlappedResult(True) 294 assert err == 0 295 assert nwritten == len(buf) 296 297 def _recv_bytes(self, maxsize=None): 298 if self._got_empty_message: 299 self._got_empty_message = False 300 return io.BytesIO() 301 else: 302 bsize = 128 if maxsize is None else min(maxsize, 128) 303 try: 304 ov, err = _winapi.ReadFile(self._handle, bsize, 305 overlapped=True) 306 try: 307 if err == _winapi.ERROR_IO_PENDING: 308 waitres = _winapi.WaitForMultipleObjects( 309 [ov.event], False, INFINITE) 310 assert waitres == WAIT_OBJECT_0 311 except: 312 ov.cancel() 313 raise 314 finally: 315 nread, err = ov.GetOverlappedResult(True) 316 if err == 0: 317 f = io.BytesIO() 318 f.write(ov.getbuffer()) 319 return f 320 elif err == _winapi.ERROR_MORE_DATA: 321 return self._get_more_data(ov, maxsize) 322 except OSError as e: 323 if e.winerror == _winapi.ERROR_BROKEN_PIPE: 324 raise EOFError 325 else: 326 raise 327 raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") 328 329 def _poll(self, timeout): 330 if (self._got_empty_message or 331 _winapi.PeekNamedPipe(self._handle)[0] != 0): 332 return True 333 return bool(wait([self], timeout)) 334 335 def _get_more_data(self, ov, maxsize): 336 buf = ov.getbuffer() 337 f = io.BytesIO() 338 f.write(buf) 339 left = _winapi.PeekNamedPipe(self._handle)[1] 340 assert left > 0 341 if maxsize is not None and len(buf) + left > maxsize: 342 self._bad_message_length() 343 ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) 344 rbytes, err = ov.GetOverlappedResult(True) 345 assert err == 0 346 assert rbytes == left 347 f.write(ov.getbuffer()) 348 return f 349 350 351class Connection(_ConnectionBase): 352 """ 353 Connection class based on an arbitrary file descriptor (Unix only), or 354 a socket handle (Windows). 355 """ 356 357 if _winapi: 358 def _close(self, _close=_multiprocessing.closesocket): 359 _close(self._handle) 360 _write = _multiprocessing.send 361 _read = _multiprocessing.recv 362 else: 363 def _close(self, _close=os.close): 364 _close(self._handle) 365 _write = os.write 366 _read = os.read 367 368 def _send(self, buf, write=_write): 369 remaining = len(buf) 370 while True: 371 n = write(self._handle, buf) 372 remaining -= n 373 if remaining == 0: 374 break 375 buf = buf[n:] 376 377 def _recv(self, size, read=_read): 378 buf = io.BytesIO() 379 handle = self._handle 380 remaining = size 381 while remaining > 0: 382 chunk = read(handle, remaining) 383 n = len(chunk) 384 if n == 0: 385 if remaining == size: 386 raise EOFError 387 else: 388 raise OSError("got end of file during message") 389 buf.write(chunk) 390 remaining -= n 391 return buf 392 393 def _send_bytes(self, buf): 394 n = len(buf) 395 # For wire compatibility with 3.2 and lower 396 header = struct.pack("!i", n) 397 if n > 16384: 398 # The payload is large so Nagle's algorithm won't be triggered 399 # and we'd better avoid the cost of concatenation. 400 self._send(header) 401 self._send(buf) 402 else: 403 # Issue #20540: concatenate before sending, to avoid delays due 404 # to Nagle's algorithm on a TCP socket. 405 # Also note we want to avoid sending a 0-length buffer separately, 406 # to avoid "broken pipe" errors if the other end closed the pipe. 407 self._send(header + buf) 408 409 def _recv_bytes(self, maxsize=None): 410 buf = self._recv(4) 411 size, = struct.unpack("!i", buf.getvalue()) 412 if maxsize is not None and size > maxsize: 413 return None 414 return self._recv(size) 415 416 def _poll(self, timeout): 417 r = wait([self], timeout) 418 return bool(r) 419 420 421# 422# Public functions 423# 424 425class Listener(object): 426 ''' 427 Returns a listener object. 428 429 This is a wrapper for a bound socket which is 'listening' for 430 connections, or for a Windows named pipe. 431 ''' 432 def __init__(self, address=None, family=None, backlog=1, authkey=None): 433 family = family or (address and address_type(address)) \ 434 or default_family 435 address = address or arbitrary_address(family) 436 437 _validate_family(family) 438 if family == 'AF_PIPE': 439 self._listener = PipeListener(address, backlog) 440 else: 441 self._listener = SocketListener(address, family, backlog) 442 443 if authkey is not None and not isinstance(authkey, bytes): 444 raise TypeError('authkey should be a byte string') 445 446 self._authkey = authkey 447 448 def accept(self): 449 ''' 450 Accept a connection on the bound socket or named pipe of `self`. 451 452 Returns a `Connection` object. 453 ''' 454 if self._listener is None: 455 raise OSError('listener is closed') 456 c = self._listener.accept() 457 if self._authkey: 458 deliver_challenge(c, self._authkey) 459 answer_challenge(c, self._authkey) 460 return c 461 462 def close(self): 463 ''' 464 Close the bound socket or named pipe of `self`. 465 ''' 466 listener = self._listener 467 if listener is not None: 468 self._listener = None 469 listener.close() 470 471 address = property(lambda self: self._listener._address) 472 last_accepted = property(lambda self: self._listener._last_accepted) 473 474 def __enter__(self): 475 return self 476 477 def __exit__(self, exc_type, exc_value, exc_tb): 478 self.close() 479 480 481def Client(address, family=None, authkey=None): 482 ''' 483 Returns a connection to the address of a `Listener` 484 ''' 485 family = family or address_type(address) 486 _validate_family(family) 487 if family == 'AF_PIPE': 488 c = PipeClient(address) 489 else: 490 c = SocketClient(address) 491 492 if authkey is not None and not isinstance(authkey, bytes): 493 raise TypeError('authkey should be a byte string') 494 495 if authkey is not None: 496 answer_challenge(c, authkey) 497 deliver_challenge(c, authkey) 498 499 return c 500 501 502if sys.platform != 'win32': 503 504 def Pipe(duplex=True): 505 ''' 506 Returns pair of connection objects at either end of a pipe 507 ''' 508 if duplex: 509 s1, s2 = socket.socketpair() 510 s1.setblocking(True) 511 s2.setblocking(True) 512 c1 = Connection(s1.detach()) 513 c2 = Connection(s2.detach()) 514 else: 515 fd1, fd2 = os.pipe() 516 c1 = Connection(fd1, writable=False) 517 c2 = Connection(fd2, readable=False) 518 519 return c1, c2 520 521else: 522 523 def Pipe(duplex=True): 524 ''' 525 Returns pair of connection objects at either end of a pipe 526 ''' 527 address = arbitrary_address('AF_PIPE') 528 if duplex: 529 openmode = _winapi.PIPE_ACCESS_DUPLEX 530 access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE 531 obsize, ibsize = BUFSIZE, BUFSIZE 532 else: 533 openmode = _winapi.PIPE_ACCESS_INBOUND 534 access = _winapi.GENERIC_WRITE 535 obsize, ibsize = 0, BUFSIZE 536 537 h1 = _winapi.CreateNamedPipe( 538 address, openmode | _winapi.FILE_FLAG_OVERLAPPED | 539 _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, 540 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 541 _winapi.PIPE_WAIT, 542 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, 543 # default security descriptor: the handle cannot be inherited 544 _winapi.NULL 545 ) 546 h2 = _winapi.CreateFile( 547 address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, 548 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 549 ) 550 _winapi.SetNamedPipeHandleState( 551 h2, _winapi.PIPE_READMODE_MESSAGE, None, None 552 ) 553 554 overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True) 555 _, err = overlapped.GetOverlappedResult(True) 556 assert err == 0 557 558 c1 = PipeConnection(h1, writable=duplex) 559 c2 = PipeConnection(h2, readable=duplex) 560 561 return c1, c2 562 563# 564# Definitions for connections based on sockets 565# 566 567class SocketListener(object): 568 ''' 569 Representation of a socket which is bound to an address and listening 570 ''' 571 def __init__(self, address, family, backlog=1): 572 self._socket = socket.socket(getattr(socket, family)) 573 try: 574 # SO_REUSEADDR has different semantics on Windows (issue #2550). 575 if os.name == 'posix': 576 self._socket.setsockopt(socket.SOL_SOCKET, 577 socket.SO_REUSEADDR, 1) 578 self._socket.setblocking(True) 579 self._socket.bind(address) 580 self._socket.listen(backlog) 581 self._address = self._socket.getsockname() 582 except OSError: 583 self._socket.close() 584 raise 585 self._family = family 586 self._last_accepted = None 587 588 if family == 'AF_UNIX': 589 self._unlink = util.Finalize( 590 self, os.unlink, args=(address,), exitpriority=0 591 ) 592 else: 593 self._unlink = None 594 595 def accept(self): 596 s, self._last_accepted = self._socket.accept() 597 s.setblocking(True) 598 return Connection(s.detach()) 599 600 def close(self): 601 try: 602 self._socket.close() 603 finally: 604 unlink = self._unlink 605 if unlink is not None: 606 self._unlink = None 607 unlink() 608 609 610def SocketClient(address): 611 ''' 612 Return a connection object connected to the socket given by `address` 613 ''' 614 family = address_type(address) 615 with socket.socket( getattr(socket, family) ) as s: 616 s.setblocking(True) 617 s.connect(address) 618 return Connection(s.detach()) 619 620# 621# Definitions for connections based on named pipes 622# 623 624if sys.platform == 'win32': 625 626 class PipeListener(object): 627 ''' 628 Representation of a named pipe 629 ''' 630 def __init__(self, address, backlog=None): 631 self._address = address 632 self._handle_queue = [self._new_handle(first=True)] 633 634 self._last_accepted = None 635 util.sub_debug('listener created with address=%r', self._address) 636 self.close = util.Finalize( 637 self, PipeListener._finalize_pipe_listener, 638 args=(self._handle_queue, self._address), exitpriority=0 639 ) 640 641 def _new_handle(self, first=False): 642 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 643 if first: 644 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 645 return _winapi.CreateNamedPipe( 646 self._address, flags, 647 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 648 _winapi.PIPE_WAIT, 649 _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, 650 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL 651 ) 652 653 def accept(self): 654 self._handle_queue.append(self._new_handle()) 655 handle = self._handle_queue.pop(0) 656 try: 657 ov = _winapi.ConnectNamedPipe(handle, overlapped=True) 658 except OSError as e: 659 if e.winerror != _winapi.ERROR_NO_DATA: 660 raise 661 # ERROR_NO_DATA can occur if a client has already connected, 662 # written data and then disconnected -- see Issue 14725. 663 else: 664 try: 665 res = _winapi.WaitForMultipleObjects( 666 [ov.event], False, INFINITE) 667 except: 668 ov.cancel() 669 _winapi.CloseHandle(handle) 670 raise 671 finally: 672 _, err = ov.GetOverlappedResult(True) 673 assert err == 0 674 return PipeConnection(handle) 675 676 @staticmethod 677 def _finalize_pipe_listener(queue, address): 678 util.sub_debug('closing listener with address=%r', address) 679 for handle in queue: 680 _winapi.CloseHandle(handle) 681 682 def PipeClient(address): 683 ''' 684 Return a connection object connected to the pipe given by `address` 685 ''' 686 t = _init_timeout() 687 while 1: 688 try: 689 _winapi.WaitNamedPipe(address, 1000) 690 h = _winapi.CreateFile( 691 address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE, 692 0, _winapi.NULL, _winapi.OPEN_EXISTING, 693 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 694 ) 695 except OSError as e: 696 if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, 697 _winapi.ERROR_PIPE_BUSY) or _check_timeout(t): 698 raise 699 else: 700 break 701 else: 702 raise 703 704 _winapi.SetNamedPipeHandleState( 705 h, _winapi.PIPE_READMODE_MESSAGE, None, None 706 ) 707 return PipeConnection(h) 708 709# 710# Authentication stuff 711# 712 713MESSAGE_LENGTH = 20 714 715CHALLENGE = b'#CHALLENGE#' 716WELCOME = b'#WELCOME#' 717FAILURE = b'#FAILURE#' 718 719def deliver_challenge(connection, authkey): 720 import hmac 721 assert isinstance(authkey, bytes) 722 message = os.urandom(MESSAGE_LENGTH) 723 connection.send_bytes(CHALLENGE + message) 724 digest = hmac.new(authkey, message, 'md5').digest() 725 response = connection.recv_bytes(256) # reject large message 726 if response == digest: 727 connection.send_bytes(WELCOME) 728 else: 729 connection.send_bytes(FAILURE) 730 raise AuthenticationError('digest received was wrong') 731 732def answer_challenge(connection, authkey): 733 import hmac 734 assert isinstance(authkey, bytes) 735 message = connection.recv_bytes(256) # reject large message 736 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message 737 message = message[len(CHALLENGE):] 738 digest = hmac.new(authkey, message, 'md5').digest() 739 connection.send_bytes(digest) 740 response = connection.recv_bytes(256) # reject large message 741 if response != WELCOME: 742 raise AuthenticationError('digest sent was rejected') 743 744# 745# Support for using xmlrpclib for serialization 746# 747 748class ConnectionWrapper(object): 749 def __init__(self, conn, dumps, loads): 750 self._conn = conn 751 self._dumps = dumps 752 self._loads = loads 753 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): 754 obj = getattr(conn, attr) 755 setattr(self, attr, obj) 756 def send(self, obj): 757 s = self._dumps(obj) 758 self._conn.send_bytes(s) 759 def recv(self): 760 s = self._conn.recv_bytes() 761 return self._loads(s) 762 763def _xml_dumps(obj): 764 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') 765 766def _xml_loads(s): 767 (obj,), method = xmlrpclib.loads(s.decode('utf-8')) 768 return obj 769 770class XmlListener(Listener): 771 def accept(self): 772 global xmlrpclib 773 import xmlrpc.client as xmlrpclib 774 obj = Listener.accept(self) 775 return ConnectionWrapper(obj, _xml_dumps, _xml_loads) 776 777def XmlClient(*args, **kwds): 778 global xmlrpclib 779 import xmlrpc.client as xmlrpclib 780 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) 781 782# 783# Wait 784# 785 786if sys.platform == 'win32': 787 788 def _exhaustive_wait(handles, timeout): 789 # Return ALL handles which are currently signalled. (Only 790 # returning the first signalled might create starvation issues.) 791 L = list(handles) 792 ready = [] 793 while L: 794 res = _winapi.WaitForMultipleObjects(L, False, timeout) 795 if res == WAIT_TIMEOUT: 796 break 797 elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): 798 res -= WAIT_OBJECT_0 799 elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L): 800 res -= WAIT_ABANDONED_0 801 else: 802 raise RuntimeError('Should not get here') 803 ready.append(L[res]) 804 L = L[res+1:] 805 timeout = 0 806 return ready 807 808 _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED} 809 810 def wait(object_list, timeout=None): 811 ''' 812 Wait till an object in object_list is ready/readable. 813 814 Returns list of those objects in object_list which are ready/readable. 815 ''' 816 if timeout is None: 817 timeout = INFINITE 818 elif timeout < 0: 819 timeout = 0 820 else: 821 timeout = int(timeout * 1000 + 0.5) 822 823 object_list = list(object_list) 824 waithandle_to_obj = {} 825 ov_list = [] 826 ready_objects = set() 827 ready_handles = set() 828 829 try: 830 for o in object_list: 831 try: 832 fileno = getattr(o, 'fileno') 833 except AttributeError: 834 waithandle_to_obj[o.__index__()] = o 835 else: 836 # start an overlapped read of length zero 837 try: 838 ov, err = _winapi.ReadFile(fileno(), 0, True) 839 except OSError as e: 840 ov, err = None, e.winerror 841 if err not in _ready_errors: 842 raise 843 if err == _winapi.ERROR_IO_PENDING: 844 ov_list.append(ov) 845 waithandle_to_obj[ov.event] = o 846 else: 847 # If o.fileno() is an overlapped pipe handle and 848 # err == 0 then there is a zero length message 849 # in the pipe, but it HAS NOT been consumed... 850 if ov and sys.getwindowsversion()[:2] >= (6, 2): 851 # ... except on Windows 8 and later, where 852 # the message HAS been consumed. 853 try: 854 _, err = ov.GetOverlappedResult(False) 855 except OSError as e: 856 err = e.winerror 857 if not err and hasattr(o, '_got_empty_message'): 858 o._got_empty_message = True 859 ready_objects.add(o) 860 timeout = 0 861 862 ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout) 863 finally: 864 # request that overlapped reads stop 865 for ov in ov_list: 866 ov.cancel() 867 868 # wait for all overlapped reads to stop 869 for ov in ov_list: 870 try: 871 _, err = ov.GetOverlappedResult(True) 872 except OSError as e: 873 err = e.winerror 874 if err not in _ready_errors: 875 raise 876 if err != _winapi.ERROR_OPERATION_ABORTED: 877 o = waithandle_to_obj[ov.event] 878 ready_objects.add(o) 879 if err == 0: 880 # If o.fileno() is an overlapped pipe handle then 881 # a zero length message HAS been consumed. 882 if hasattr(o, '_got_empty_message'): 883 o._got_empty_message = True 884 885 ready_objects.update(waithandle_to_obj[h] for h in ready_handles) 886 return [o for o in object_list if o in ready_objects] 887 888else: 889 890 import selectors 891 892 # poll/select have the advantage of not requiring any extra file 893 # descriptor, contrarily to epoll/kqueue (also, they require a single 894 # syscall). 895 if hasattr(selectors, 'PollSelector'): 896 _WaitSelector = selectors.PollSelector 897 else: 898 _WaitSelector = selectors.SelectSelector 899 900 def wait(object_list, timeout=None): 901 ''' 902 Wait till an object in object_list is ready/readable. 903 904 Returns list of those objects in object_list which are ready/readable. 905 ''' 906 with _WaitSelector() as selector: 907 for obj in object_list: 908 selector.register(obj, selectors.EVENT_READ) 909 910 if timeout is not None: 911 deadline = getattr(time,'monotonic',time.time)() + timeout 912 913 while True: 914 ready = selector.select(timeout) 915 if ready: 916 return [key.fileobj for (key, events) in ready] 917 else: 918 if timeout is not None: 919 timeout = deadline - getattr(time,'monotonic',time.time)() 920 if timeout < 0: 921 return ready 922 923# 924# Make connection and socket objects sharable if possible 925# 926 927if sys.platform == 'win32': 928 def reduce_connection(conn): 929 handle = conn.fileno() 930 with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: 931 from . import resource_sharer 932 ds = resource_sharer.DupSocket(s) 933 return rebuild_connection, (ds, conn.readable, conn.writable) 934 def rebuild_connection(ds, readable, writable): 935 sock = ds.detach() 936 return Connection(sock.detach(), readable, writable) 937 reduction.register(Connection, reduce_connection) 938 939 def reduce_pipe_connection(conn): 940 access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | 941 (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) 942 dh = reduction.DupHandle(conn.fileno(), access) 943 return rebuild_pipe_connection, (dh, conn.readable, conn.writable) 944 def rebuild_pipe_connection(dh, readable, writable): 945 handle = dh.detach() 946 return PipeConnection(handle, readable, writable) 947 reduction.register(PipeConnection, reduce_pipe_connection) 948 949else: 950 def reduce_connection(conn): 951 df = reduction.DupFd(conn.fileno()) 952 return rebuild_connection, (df, conn.readable, conn.writable) 953 def rebuild_connection(df, readable, writable): 954 fd = df.detach() 955 return Connection(fd, readable, writable) 956 reduction.register(Connection, reduce_connection) 957