1"""Selector and proactor event loops for Windows.""" 2 3import errno 4import math 5import socket 6import struct 7import weakref 8 9from . import events 10from . import base_subprocess 11from . import futures 12from . import proactor_events 13from . import py33_winapi as _winapi 14from . import selector_events 15from . import tasks 16from . import windows_utils 17from . import _overlapped 18from .coroutines import coroutine, From, Return 19from .log import logger 20from .py33_exceptions import wrap_error, BrokenPipeError, ConnectionResetError 21 22 23__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor', 24 'DefaultEventLoopPolicy', 25 ] 26 27 28NULL = 0 29INFINITE = 0xffffffff 30ERROR_CONNECTION_REFUSED = 1225 31ERROR_CONNECTION_ABORTED = 1236 32 33# Initial delay in seconds for connect_pipe() before retrying to connect 34CONNECT_PIPE_INIT_DELAY = 0.001 35 36# Maximum delay in seconds for connect_pipe() before retrying to connect 37CONNECT_PIPE_MAX_DELAY = 0.100 38 39 40class _OverlappedFuture(futures.Future): 41 """Subclass of Future which represents an overlapped operation. 42 43 Cancelling it will immediately cancel the overlapped operation. 44 """ 45 46 def __init__(self, ov, loop=None): 47 super(_OverlappedFuture, self).__init__(loop=loop) 48 if self._source_traceback: 49 del self._source_traceback[-1] 50 self._ov = ov 51 52 def _repr_info(self): 53 info = super(_OverlappedFuture, self)._repr_info() 54 if self._ov is not None: 55 state = 'pending' if self._ov.pending else 'completed' 56 info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address)) 57 return info 58 59 def _cancel_overlapped(self): 60 if self._ov is None: 61 return 62 try: 63 self._ov.cancel() 64 except OSError as exc: 65 context = { 66 'message': 'Cancelling an overlapped future failed', 67 'exception': exc, 68 'future': self, 69 } 70 if self._source_traceback: 71 context['source_traceback'] = self._source_traceback 72 self._loop.call_exception_handler(context) 73 self._ov = None 74 75 def cancel(self): 76 self._cancel_overlapped() 77 return super(_OverlappedFuture, self).cancel() 78 79 def set_exception(self, exception): 80 super(_OverlappedFuture, self).set_exception(exception) 81 self._cancel_overlapped() 82 83 def set_result(self, result): 84 super(_OverlappedFuture, self).set_result(result) 85 self._ov = None 86 87 88class _BaseWaitHandleFuture(futures.Future): 89 """Subclass of Future which represents a wait handle.""" 90 91 def __init__(self, ov, handle, wait_handle, loop=None): 92 super(_BaseWaitHandleFuture, self).__init__(loop=loop) 93 if self._source_traceback: 94 del self._source_traceback[-1] 95 # Keep a reference to the Overlapped object to keep it alive until the 96 # wait is unregistered 97 self._ov = ov 98 self._handle = handle 99 self._wait_handle = wait_handle 100 101 # Should we call UnregisterWaitEx() if the wait completes 102 # or is cancelled? 103 self._registered = True 104 105 def _poll(self): 106 # non-blocking wait: use a timeout of 0 millisecond 107 return (_winapi.WaitForSingleObject(self._handle, 0) == 108 _winapi.WAIT_OBJECT_0) 109 110 def _repr_info(self): 111 info = super(_BaseWaitHandleFuture, self)._repr_info() 112 info.append('handle=%#x' % self._handle) 113 if self._handle is not None: 114 state = 'signaled' if self._poll() else 'waiting' 115 info.append(state) 116 if self._wait_handle is not None: 117 info.append('wait_handle=%#x' % self._wait_handle) 118 return info 119 120 def _unregister_wait_cb(self, fut): 121 # The wait was unregistered: it's not safe to destroy the Overlapped 122 # object 123 self._ov = None 124 125 def _unregister_wait(self): 126 if not self._registered: 127 return 128 self._registered = False 129 130 wait_handle = self._wait_handle 131 self._wait_handle = None 132 try: 133 _overlapped.UnregisterWait(wait_handle) 134 except OSError as exc: 135 if exc.winerror != _overlapped.ERROR_IO_PENDING: 136 context = { 137 'message': 'Failed to unregister the wait handle', 138 'exception': exc, 139 'future': self, 140 } 141 if self._source_traceback: 142 context['source_traceback'] = self._source_traceback 143 self._loop.call_exception_handler(context) 144 return 145 # ERROR_IO_PENDING means that the unregister is pending 146 147 self._unregister_wait_cb(None) 148 149 def cancel(self): 150 self._unregister_wait() 151 return super(_BaseWaitHandleFuture, self).cancel() 152 153 def set_exception(self, exception): 154 self._unregister_wait() 155 super(_BaseWaitHandleFuture, self).set_exception(exception) 156 157 def set_result(self, result): 158 self._unregister_wait() 159 super(_BaseWaitHandleFuture, self).set_result(result) 160 161 162class _WaitCancelFuture(_BaseWaitHandleFuture): 163 """Subclass of Future which represents a wait for the cancellation of a 164 _WaitHandleFuture using an event. 165 """ 166 167 def __init__(self, ov, event, wait_handle, loop=None): 168 super(_WaitCancelFuture, self).__init__(ov, event, wait_handle, 169 loop=loop) 170 171 self._done_callback = None 172 173 def cancel(self): 174 raise RuntimeError("_WaitCancelFuture must not be cancelled") 175 176 def _schedule_callbacks(self): 177 super(_WaitCancelFuture, self)._schedule_callbacks() 178 if self._done_callback is not None: 179 self._done_callback(self) 180 181 182class _WaitHandleFuture(_BaseWaitHandleFuture): 183 def __init__(self, ov, handle, wait_handle, proactor, loop=None): 184 super(_WaitHandleFuture, self).__init__(ov, handle, wait_handle, 185 loop=loop) 186 self._proactor = proactor 187 self._unregister_proactor = True 188 self._event = _overlapped.CreateEvent(None, True, False, None) 189 self._event_fut = None 190 191 def _unregister_wait_cb(self, fut): 192 if self._event is not None: 193 _winapi.CloseHandle(self._event) 194 self._event = None 195 self._event_fut = None 196 197 # If the wait was cancelled, the wait may never be signalled, so 198 # it's required to unregister it. Otherwise, IocpProactor.close() will 199 # wait forever for an event which will never come. 200 # 201 # If the IocpProactor already received the event, it's safe to call 202 # _unregister() because we kept a reference to the Overlapped object 203 # which is used as an unique key. 204 self._proactor._unregister(self._ov) 205 self._proactor = None 206 207 super(_WaitHandleFuture, self)._unregister_wait_cb(fut) 208 209 def _unregister_wait(self): 210 if not self._registered: 211 return 212 self._registered = False 213 214 wait_handle = self._wait_handle 215 self._wait_handle = None 216 try: 217 _overlapped.UnregisterWaitEx(wait_handle, self._event) 218 except OSError as exc: 219 if exc.winerror != _overlapped.ERROR_IO_PENDING: 220 context = { 221 'message': 'Failed to unregister the wait handle', 222 'exception': exc, 223 'future': self, 224 } 225 if self._source_traceback: 226 context['source_traceback'] = self._source_traceback 227 self._loop.call_exception_handler(context) 228 return 229 # ERROR_IO_PENDING is not an error, the wait was unregistered 230 231 self._event_fut = self._proactor._wait_cancel(self._event, 232 self._unregister_wait_cb) 233 234 235class PipeServer(object): 236 """Class representing a pipe server. 237 238 This is much like a bound, listening socket. 239 """ 240 def __init__(self, address): 241 self._address = address 242 self._free_instances = weakref.WeakSet() 243 # initialize the pipe attribute before calling _server_pipe_handle() 244 # because this function can raise an exception and the destructor calls 245 # the close() method 246 self._pipe = None 247 self._accept_pipe_future = None 248 self._pipe = self._server_pipe_handle(True) 249 250 def _get_unconnected_pipe(self): 251 # Create new instance and return previous one. This ensures 252 # that (until the server is closed) there is always at least 253 # one pipe handle for address. Therefore if a client attempt 254 # to connect it will not fail with FileNotFoundError. 255 tmp, self._pipe = self._pipe, self._server_pipe_handle(False) 256 return tmp 257 258 def _server_pipe_handle(self, first): 259 # Return a wrapper for a new pipe handle. 260 if self.closed(): 261 return None 262 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 263 if first: 264 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 265 h = wrap_error(_winapi.CreateNamedPipe, 266 self._address, flags, 267 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 268 _winapi.PIPE_WAIT, 269 _winapi.PIPE_UNLIMITED_INSTANCES, 270 windows_utils.BUFSIZE, windows_utils.BUFSIZE, 271 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL) 272 pipe = windows_utils.PipeHandle(h) 273 self._free_instances.add(pipe) 274 return pipe 275 276 def closed(self): 277 return (self._address is None) 278 279 def close(self): 280 if self._accept_pipe_future is not None: 281 self._accept_pipe_future.cancel() 282 self._accept_pipe_future = None 283 # Close all instances which have not been connected to by a client. 284 if self._address is not None: 285 for pipe in self._free_instances: 286 pipe.close() 287 self._pipe = None 288 self._address = None 289 self._free_instances.clear() 290 291 __del__ = close 292 293 294class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): 295 """Windows version of selector event loop.""" 296 297 def _socketpair(self): 298 return windows_utils.socketpair() 299 300 301class ProactorEventLoop(proactor_events.BaseProactorEventLoop): 302 """Windows version of proactor event loop using IOCP.""" 303 304 def __init__(self, proactor=None): 305 if proactor is None: 306 proactor = IocpProactor() 307 super(ProactorEventLoop, self).__init__(proactor) 308 309 def _socketpair(self): 310 return windows_utils.socketpair() 311 312 @coroutine 313 def create_pipe_connection(self, protocol_factory, address): 314 f = self._proactor.connect_pipe(address) 315 pipe = yield From(f) 316 protocol = protocol_factory() 317 trans = self._make_duplex_pipe_transport(pipe, protocol, 318 extra={'addr': address}) 319 raise Return(trans, protocol) 320 321 @coroutine 322 def start_serving_pipe(self, protocol_factory, address): 323 server = PipeServer(address) 324 325 def loop_accept_pipe(f=None): 326 pipe = None 327 try: 328 if f: 329 pipe = f.result() 330 server._free_instances.discard(pipe) 331 332 if server.closed(): 333 # A client connected before the server was closed: 334 # drop the client (close the pipe) and exit 335 pipe.close() 336 return 337 338 protocol = protocol_factory() 339 self._make_duplex_pipe_transport( 340 pipe, protocol, extra={'addr': address}) 341 342 pipe = server._get_unconnected_pipe() 343 if pipe is None: 344 return 345 346 f = self._proactor.accept_pipe(pipe) 347 except OSError as exc: 348 if pipe and pipe.fileno() != -1: 349 self.call_exception_handler({ 350 'message': 'Pipe accept failed', 351 'exception': exc, 352 'pipe': pipe, 353 }) 354 pipe.close() 355 elif self._debug: 356 logger.warning("Accept pipe failed on pipe %r", 357 pipe, exc_info=True) 358 except futures.CancelledError: 359 if pipe: 360 pipe.close() 361 else: 362 server._accept_pipe_future = f 363 f.add_done_callback(loop_accept_pipe) 364 365 self.call_soon(loop_accept_pipe) 366 return [server] 367 368 @coroutine 369 def _make_subprocess_transport(self, protocol, args, shell, 370 stdin, stdout, stderr, bufsize, 371 extra=None, **kwargs): 372 waiter = futures.Future(loop=self) 373 transp = _WindowsSubprocessTransport(self, protocol, args, shell, 374 stdin, stdout, stderr, bufsize, 375 waiter=waiter, extra=extra, 376 **kwargs) 377 try: 378 yield From(waiter) 379 except Exception as exc: 380 # Workaround CPython bug #23353: using yield/yield-from in an 381 # except block of a generator doesn't clear properly sys.exc_info() 382 err = exc 383 else: 384 err = None 385 386 if err is not None: 387 transp.close() 388 yield From(transp._wait()) 389 raise err 390 391 raise Return(transp) 392 393 394class IocpProactor(object): 395 """Proactor implementation using IOCP.""" 396 397 def __init__(self, concurrency=0xffffffff): 398 self._loop = None 399 self._results = [] 400 self._iocp = _overlapped.CreateIoCompletionPort( 401 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency) 402 self._cache = {} 403 self._registered = weakref.WeakSet() 404 self._unregistered = [] 405 self._stopped_serving = weakref.WeakSet() 406 407 def __repr__(self): 408 return ('<%s overlapped#=%s result#=%s>' 409 % (self.__class__.__name__, len(self._cache), 410 len(self._results))) 411 412 def set_loop(self, loop): 413 self._loop = loop 414 415 def select(self, timeout=None): 416 if not self._results: 417 self._poll(timeout) 418 tmp = self._results 419 self._results = [] 420 return tmp 421 422 def _result(self, value): 423 fut = futures.Future(loop=self._loop) 424 fut.set_result(value) 425 return fut 426 427 def recv(self, conn, nbytes, flags=0): 428 self._register_with_iocp(conn) 429 ov = _overlapped.Overlapped(NULL) 430 try: 431 if isinstance(conn, socket.socket): 432 wrap_error(ov.WSARecv, conn.fileno(), nbytes, flags) 433 else: 434 wrap_error(ov.ReadFile, conn.fileno(), nbytes) 435 except BrokenPipeError: 436 return self._result(b'') 437 438 def finish_recv(trans, key, ov): 439 try: 440 return wrap_error(ov.getresult) 441 except WindowsError as exc: 442 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED: 443 raise ConnectionResetError(*exc.args) 444 else: 445 raise 446 447 return self._register(ov, conn, finish_recv) 448 449 def send(self, conn, buf, flags=0): 450 self._register_with_iocp(conn) 451 ov = _overlapped.Overlapped(NULL) 452 if isinstance(conn, socket.socket): 453 ov.WSASend(conn.fileno(), buf, flags) 454 else: 455 ov.WriteFile(conn.fileno(), buf) 456 457 def finish_send(trans, key, ov): 458 try: 459 return wrap_error(ov.getresult) 460 except WindowsError as exc: 461 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED: 462 raise ConnectionResetError(*exc.args) 463 else: 464 raise 465 466 return self._register(ov, conn, finish_send) 467 468 def accept(self, listener): 469 self._register_with_iocp(listener) 470 conn = self._get_accept_socket(listener.family) 471 ov = _overlapped.Overlapped(NULL) 472 ov.AcceptEx(listener.fileno(), conn.fileno()) 473 474 def finish_accept(trans, key, ov): 475 wrap_error(ov.getresult) 476 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work. 477 buf = struct.pack('@P', listener.fileno()) 478 conn.setsockopt(socket.SOL_SOCKET, 479 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf) 480 conn.settimeout(listener.gettimeout()) 481 return conn, conn.getpeername() 482 483 @coroutine 484 def accept_coro(future, conn): 485 # Coroutine closing the accept socket if the future is cancelled 486 try: 487 yield From(future) 488 except futures.CancelledError: 489 conn.close() 490 raise 491 492 future = self._register(ov, listener, finish_accept) 493 coro = accept_coro(future, conn) 494 tasks.ensure_future(coro, loop=self._loop) 495 return future 496 497 def connect(self, conn, address): 498 self._register_with_iocp(conn) 499 # The socket needs to be locally bound before we call ConnectEx(). 500 try: 501 _overlapped.BindLocal(conn.fileno(), conn.family) 502 except WindowsError as e: 503 if e.winerror != errno.WSAEINVAL: 504 raise 505 # Probably already locally bound; check using getsockname(). 506 if conn.getsockname()[1] == 0: 507 raise 508 ov = _overlapped.Overlapped(NULL) 509 ov.ConnectEx(conn.fileno(), address) 510 511 def finish_connect(trans, key, ov): 512 wrap_error(ov.getresult) 513 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work. 514 conn.setsockopt(socket.SOL_SOCKET, 515 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0) 516 return conn 517 518 return self._register(ov, conn, finish_connect) 519 520 def accept_pipe(self, pipe): 521 self._register_with_iocp(pipe) 522 ov = _overlapped.Overlapped(NULL) 523 connected = ov.ConnectNamedPipe(pipe.fileno()) 524 525 if connected: 526 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means 527 # that the pipe is connected. There is no need to wait for the 528 # completion of the connection. 529 return self._result(pipe) 530 531 def finish_accept_pipe(trans, key, ov): 532 wrap_error(ov.getresult) 533 return pipe 534 535 return self._register(ov, pipe, finish_accept_pipe) 536 537 @coroutine 538 def connect_pipe(self, address): 539 delay = CONNECT_PIPE_INIT_DELAY 540 while True: 541 # Unfortunately there is no way to do an overlapped connect to a pipe. 542 # Call CreateFile() in a loop until it doesn't fail with 543 # ERROR_PIPE_BUSY 544 try: 545 handle = wrap_error(_overlapped.ConnectPipe, address) 546 break 547 except WindowsError as exc: 548 if exc.winerror != _overlapped.ERROR_PIPE_BUSY: 549 raise 550 551 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later 552 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) 553 yield From(tasks.sleep(delay, loop=self._loop)) 554 555 raise Return(windows_utils.PipeHandle(handle)) 556 557 def wait_for_handle(self, handle, timeout=None): 558 """Wait for a handle. 559 560 Return a Future object. The result of the future is True if the wait 561 completed, or False if the wait did not complete (on timeout). 562 """ 563 return self._wait_for_handle(handle, timeout, False) 564 565 def _wait_cancel(self, event, done_callback): 566 fut = self._wait_for_handle(event, None, True) 567 # add_done_callback() cannot be used because the wait may only complete 568 # in IocpProactor.close(), while the event loop is not running. 569 fut._done_callback = done_callback 570 return fut 571 572 def _wait_for_handle(self, handle, timeout, _is_cancel): 573 if timeout is None: 574 ms = _winapi.INFINITE 575 else: 576 # RegisterWaitForSingleObject() has a resolution of 1 millisecond, 577 # round away from zero to wait *at least* timeout seconds. 578 ms = int(math.ceil(timeout * 1e3)) 579 580 # We only create ov so we can use ov.address as a key for the cache. 581 ov = _overlapped.Overlapped(NULL) 582 wait_handle = _overlapped.RegisterWaitWithQueue( 583 handle, self._iocp, ov.address, ms) 584 if _is_cancel: 585 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop) 586 else: 587 f = _WaitHandleFuture(ov, handle, wait_handle, self, 588 loop=self._loop) 589 if f._source_traceback: 590 del f._source_traceback[-1] 591 592 def finish_wait_for_handle(trans, key, ov): 593 # Note that this second wait means that we should only use 594 # this with handles types where a successful wait has no 595 # effect. So events or processes are all right, but locks 596 # or semaphores are not. Also note if the handle is 597 # signalled and then quickly reset, then we may return 598 # False even though we have not timed out. 599 return f._poll() 600 601 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle) 602 return f 603 604 def _register_with_iocp(self, obj): 605 # To get notifications of finished ops on this objects sent to the 606 # completion port, were must register the handle. 607 if obj not in self._registered: 608 self._registered.add(obj) 609 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0) 610 # XXX We could also use SetFileCompletionNotificationModes() 611 # to avoid sending notifications to completion port of ops 612 # that succeed immediately. 613 614 def _register(self, ov, obj, callback): 615 # Return a future which will be set with the result of the 616 # operation when it completes. The future's value is actually 617 # the value returned by callback(). 618 f = _OverlappedFuture(ov, loop=self._loop) 619 if f._source_traceback: 620 del f._source_traceback[-1] 621 if not ov.pending: 622 # The operation has completed, so no need to postpone the 623 # work. We cannot take this short cut if we need the 624 # NumberOfBytes, CompletionKey values returned by 625 # PostQueuedCompletionStatus(). 626 try: 627 value = callback(None, None, ov) 628 except OSError as e: 629 f.set_exception(e) 630 else: 631 f.set_result(value) 632 # Even if GetOverlappedResult() was called, we have to wait for the 633 # notification of the completion in GetQueuedCompletionStatus(). 634 # Register the overlapped operation to keep a reference to the 635 # OVERLAPPED object, otherwise the memory is freed and Windows may 636 # read uninitialized memory. 637 638 # Register the overlapped operation for later. Note that 639 # we only store obj to prevent it from being garbage 640 # collected too early. 641 self._cache[ov.address] = (f, ov, obj, callback) 642 return f 643 644 def _unregister(self, ov): 645 """Unregister an overlapped object. 646 647 Call this method when its future has been cancelled. The event can 648 already be signalled (pending in the proactor event queue). It is also 649 safe if the event is never signalled (because it was cancelled). 650 """ 651 self._unregistered.append(ov) 652 653 def _get_accept_socket(self, family): 654 s = socket.socket(family) 655 s.settimeout(0) 656 return s 657 658 def _poll(self, timeout=None): 659 if timeout is None: 660 ms = INFINITE 661 elif timeout < 0: 662 raise ValueError("negative timeout") 663 else: 664 # GetQueuedCompletionStatus() has a resolution of 1 millisecond, 665 # round away from zero to wait *at least* timeout seconds. 666 ms = int(math.ceil(timeout * 1e3)) 667 if ms >= INFINITE: 668 raise ValueError("timeout too big") 669 670 while True: 671 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) 672 if status is None: 673 break 674 ms = 0 675 676 err, transferred, key, address = status 677 try: 678 f, ov, obj, callback = self._cache.pop(address) 679 except KeyError: 680 if self._loop.get_debug(): 681 self._loop.call_exception_handler({ 682 'message': ('GetQueuedCompletionStatus() returned an ' 683 'unexpected event'), 684 'status': ('err=%s transferred=%s key=%#x address=%#x' 685 % (err, transferred, key, address)), 686 }) 687 688 # key is either zero, or it is used to return a pipe 689 # handle which should be closed to avoid a leak. 690 if key not in (0, _overlapped.INVALID_HANDLE_VALUE): 691 _winapi.CloseHandle(key) 692 continue 693 694 if obj in self._stopped_serving: 695 f.cancel() 696 # Don't call the callback if _register() already read the result or 697 # if the overlapped has been cancelled 698 elif not f.done(): 699 try: 700 value = callback(transferred, key, ov) 701 except OSError as e: 702 f.set_exception(e) 703 self._results.append(f) 704 else: 705 f.set_result(value) 706 self._results.append(f) 707 708 # Remove unregisted futures 709 for ov in self._unregistered: 710 self._cache.pop(ov.address, None) 711 del self._unregistered[:] 712 713 def _stop_serving(self, obj): 714 # obj is a socket or pipe handle. It will be closed in 715 # BaseProactorEventLoop._stop_serving() which will make any 716 # pending operations fail quickly. 717 self._stopped_serving.add(obj) 718 719 def close(self): 720 # Cancel remaining registered operations. 721 for address, (fut, ov, obj, callback) in list(self._cache.items()): 722 if fut.cancelled(): 723 # Nothing to do with cancelled futures 724 pass 725 elif isinstance(fut, _WaitCancelFuture): 726 # _WaitCancelFuture must not be cancelled 727 pass 728 else: 729 try: 730 fut.cancel() 731 except OSError as exc: 732 if self._loop is not None: 733 context = { 734 'message': 'Cancelling a future failed', 735 'exception': exc, 736 'future': fut, 737 } 738 if fut._source_traceback: 739 context['source_traceback'] = fut._source_traceback 740 self._loop.call_exception_handler(context) 741 742 while self._cache: 743 if not self._poll(1): 744 logger.debug('taking long time to close proactor') 745 746 self._results = [] 747 if self._iocp is not None: 748 _winapi.CloseHandle(self._iocp) 749 self._iocp = None 750 751 def __del__(self): 752 self.close() 753 754 755class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport): 756 757 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 758 self._proc = windows_utils.Popen( 759 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 760 bufsize=bufsize, **kwargs) 761 762 def callback(f): 763 returncode = self._proc.poll() 764 self._process_exited(returncode) 765 766 f = self._loop._proactor.wait_for_handle(int(self._proc._handle)) 767 f.add_done_callback(callback) 768 769 770SelectorEventLoop = _WindowsSelectorEventLoop 771 772 773class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 774 _loop_factory = SelectorEventLoop 775 776 777DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy 778