1"""Selector and proactor event loops for Windows."""
2
3import errno
4import math
5import socket
6import struct
7import weakref
8
9from . import events
10from . import base_subprocess
11from . import futures
12from . import proactor_events
13from . import py33_winapi as _winapi
14from . import selector_events
15from . import tasks
16from . import windows_utils
17from . import _overlapped
18from .coroutines import coroutine, From, Return
19from .log import logger
20from .py33_exceptions import wrap_error, BrokenPipeError, ConnectionResetError
21
22
23__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
24           'DefaultEventLoopPolicy',
25           ]
26
27
28NULL = 0
29INFINITE = 0xffffffff
30ERROR_CONNECTION_REFUSED = 1225
31ERROR_CONNECTION_ABORTED = 1236
32
33# Initial delay in seconds for connect_pipe() before retrying to connect
34CONNECT_PIPE_INIT_DELAY = 0.001
35
36# Maximum delay in seconds for connect_pipe() before retrying to connect
37CONNECT_PIPE_MAX_DELAY = 0.100
38
39
40class _OverlappedFuture(futures.Future):
41    """Subclass of Future which represents an overlapped operation.
42
43    Cancelling it will immediately cancel the overlapped operation.
44    """
45
46    def __init__(self, ov, loop=None):
47        super(_OverlappedFuture, self).__init__(loop=loop)
48        if self._source_traceback:
49            del self._source_traceback[-1]
50        self._ov = ov
51
52    def _repr_info(self):
53        info = super(_OverlappedFuture, self)._repr_info()
54        if self._ov is not None:
55            state = 'pending' if self._ov.pending else 'completed'
56            info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address))
57        return info
58
59    def _cancel_overlapped(self):
60        if self._ov is None:
61            return
62        try:
63            self._ov.cancel()
64        except OSError as exc:
65            context = {
66                'message': 'Cancelling an overlapped future failed',
67                'exception': exc,
68                'future': self,
69            }
70            if self._source_traceback:
71                context['source_traceback'] = self._source_traceback
72            self._loop.call_exception_handler(context)
73        self._ov = None
74
75    def cancel(self):
76        self._cancel_overlapped()
77        return super(_OverlappedFuture, self).cancel()
78
79    def set_exception(self, exception):
80        super(_OverlappedFuture, self).set_exception(exception)
81        self._cancel_overlapped()
82
83    def set_result(self, result):
84        super(_OverlappedFuture, self).set_result(result)
85        self._ov = None
86
87
88class _BaseWaitHandleFuture(futures.Future):
89    """Subclass of Future which represents a wait handle."""
90
91    def __init__(self, ov, handle, wait_handle, loop=None):
92        super(_BaseWaitHandleFuture, self).__init__(loop=loop)
93        if self._source_traceback:
94            del self._source_traceback[-1]
95        # Keep a reference to the Overlapped object to keep it alive until the
96        # wait is unregistered
97        self._ov = ov
98        self._handle = handle
99        self._wait_handle = wait_handle
100
101        # Should we call UnregisterWaitEx() if the wait completes
102        # or is cancelled?
103        self._registered = True
104
105    def _poll(self):
106        # non-blocking wait: use a timeout of 0 millisecond
107        return (_winapi.WaitForSingleObject(self._handle, 0) ==
108                _winapi.WAIT_OBJECT_0)
109
110    def _repr_info(self):
111        info = super(_BaseWaitHandleFuture, self)._repr_info()
112        info.append('handle=%#x' % self._handle)
113        if self._handle is not None:
114            state = 'signaled' if self._poll() else 'waiting'
115            info.append(state)
116        if self._wait_handle is not None:
117            info.append('wait_handle=%#x' % self._wait_handle)
118        return info
119
120    def _unregister_wait_cb(self, fut):
121        # The wait was unregistered: it's not safe to destroy the Overlapped
122        # object
123        self._ov = None
124
125    def _unregister_wait(self):
126        if not self._registered:
127            return
128        self._registered = False
129
130        wait_handle = self._wait_handle
131        self._wait_handle = None
132        try:
133            _overlapped.UnregisterWait(wait_handle)
134        except OSError as exc:
135            if exc.winerror != _overlapped.ERROR_IO_PENDING:
136                context = {
137                    'message': 'Failed to unregister the wait handle',
138                    'exception': exc,
139                    'future': self,
140                }
141                if self._source_traceback:
142                    context['source_traceback'] = self._source_traceback
143                self._loop.call_exception_handler(context)
144                return
145            # ERROR_IO_PENDING means that the unregister is pending
146
147        self._unregister_wait_cb(None)
148
149    def cancel(self):
150        self._unregister_wait()
151        return super(_BaseWaitHandleFuture, self).cancel()
152
153    def set_exception(self, exception):
154        self._unregister_wait()
155        super(_BaseWaitHandleFuture, self).set_exception(exception)
156
157    def set_result(self, result):
158        self._unregister_wait()
159        super(_BaseWaitHandleFuture, self).set_result(result)
160
161
162class _WaitCancelFuture(_BaseWaitHandleFuture):
163    """Subclass of Future which represents a wait for the cancellation of a
164    _WaitHandleFuture using an event.
165    """
166
167    def __init__(self, ov, event, wait_handle, loop=None):
168        super(_WaitCancelFuture, self).__init__(ov, event, wait_handle,
169                                                loop=loop)
170
171        self._done_callback = None
172
173    def cancel(self):
174        raise RuntimeError("_WaitCancelFuture must not be cancelled")
175
176    def _schedule_callbacks(self):
177        super(_WaitCancelFuture, self)._schedule_callbacks()
178        if self._done_callback is not None:
179            self._done_callback(self)
180
181
182class _WaitHandleFuture(_BaseWaitHandleFuture):
183    def __init__(self, ov, handle, wait_handle, proactor, loop=None):
184        super(_WaitHandleFuture, self).__init__(ov, handle, wait_handle,
185                                                loop=loop)
186        self._proactor = proactor
187        self._unregister_proactor = True
188        self._event = _overlapped.CreateEvent(None, True, False, None)
189        self._event_fut = None
190
191    def _unregister_wait_cb(self, fut):
192        if self._event is not None:
193            _winapi.CloseHandle(self._event)
194            self._event = None
195            self._event_fut = None
196
197        # If the wait was cancelled, the wait may never be signalled, so
198        # it's required to unregister it. Otherwise, IocpProactor.close() will
199        # wait forever for an event which will never come.
200        #
201        # If the IocpProactor already received the event, it's safe to call
202        # _unregister() because we kept a reference to the Overlapped object
203        # which is used as an unique key.
204        self._proactor._unregister(self._ov)
205        self._proactor = None
206
207        super(_WaitHandleFuture, self)._unregister_wait_cb(fut)
208
209    def _unregister_wait(self):
210        if not self._registered:
211            return
212        self._registered = False
213
214        wait_handle = self._wait_handle
215        self._wait_handle = None
216        try:
217            _overlapped.UnregisterWaitEx(wait_handle, self._event)
218        except OSError as exc:
219            if exc.winerror != _overlapped.ERROR_IO_PENDING:
220                context = {
221                    'message': 'Failed to unregister the wait handle',
222                    'exception': exc,
223                    'future': self,
224                }
225                if self._source_traceback:
226                    context['source_traceback'] = self._source_traceback
227                self._loop.call_exception_handler(context)
228                return
229            # ERROR_IO_PENDING is not an error, the wait was unregistered
230
231        self._event_fut = self._proactor._wait_cancel(self._event,
232                                                      self._unregister_wait_cb)
233
234
235class PipeServer(object):
236    """Class representing a pipe server.
237
238    This is much like a bound, listening socket.
239    """
240    def __init__(self, address):
241        self._address = address
242        self._free_instances = weakref.WeakSet()
243        # initialize the pipe attribute before calling _server_pipe_handle()
244        # because this function can raise an exception and the destructor calls
245        # the close() method
246        self._pipe = None
247        self._accept_pipe_future = None
248        self._pipe = self._server_pipe_handle(True)
249
250    def _get_unconnected_pipe(self):
251        # Create new instance and return previous one.  This ensures
252        # that (until the server is closed) there is always at least
253        # one pipe handle for address.  Therefore if a client attempt
254        # to connect it will not fail with FileNotFoundError.
255        tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
256        return tmp
257
258    def _server_pipe_handle(self, first):
259        # Return a wrapper for a new pipe handle.
260        if self.closed():
261            return None
262        flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
263        if first:
264            flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
265        h = wrap_error(_winapi.CreateNamedPipe,
266            self._address, flags,
267            _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
268            _winapi.PIPE_WAIT,
269            _winapi.PIPE_UNLIMITED_INSTANCES,
270            windows_utils.BUFSIZE, windows_utils.BUFSIZE,
271            _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
272        pipe = windows_utils.PipeHandle(h)
273        self._free_instances.add(pipe)
274        return pipe
275
276    def closed(self):
277        return (self._address is None)
278
279    def close(self):
280        if self._accept_pipe_future is not None:
281            self._accept_pipe_future.cancel()
282            self._accept_pipe_future = None
283        # Close all instances which have not been connected to by a client.
284        if self._address is not None:
285            for pipe in self._free_instances:
286                pipe.close()
287            self._pipe = None
288            self._address = None
289            self._free_instances.clear()
290
291    __del__ = close
292
293
294class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
295    """Windows version of selector event loop."""
296
297    def _socketpair(self):
298        return windows_utils.socketpair()
299
300
301class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
302    """Windows version of proactor event loop using IOCP."""
303
304    def __init__(self, proactor=None):
305        if proactor is None:
306            proactor = IocpProactor()
307        super(ProactorEventLoop, self).__init__(proactor)
308
309    def _socketpair(self):
310        return windows_utils.socketpair()
311
312    @coroutine
313    def create_pipe_connection(self, protocol_factory, address):
314        f = self._proactor.connect_pipe(address)
315        pipe = yield From(f)
316        protocol = protocol_factory()
317        trans = self._make_duplex_pipe_transport(pipe, protocol,
318                                                 extra={'addr': address})
319        raise Return(trans, protocol)
320
321    @coroutine
322    def start_serving_pipe(self, protocol_factory, address):
323        server = PipeServer(address)
324
325        def loop_accept_pipe(f=None):
326            pipe = None
327            try:
328                if f:
329                    pipe = f.result()
330                    server._free_instances.discard(pipe)
331
332                    if server.closed():
333                        # A client connected before the server was closed:
334                        # drop the client (close the pipe) and exit
335                        pipe.close()
336                        return
337
338                    protocol = protocol_factory()
339                    self._make_duplex_pipe_transport(
340                        pipe, protocol, extra={'addr': address})
341
342                pipe = server._get_unconnected_pipe()
343                if pipe is None:
344                    return
345
346                f = self._proactor.accept_pipe(pipe)
347            except OSError as exc:
348                if pipe and pipe.fileno() != -1:
349                    self.call_exception_handler({
350                        'message': 'Pipe accept failed',
351                        'exception': exc,
352                        'pipe': pipe,
353                    })
354                    pipe.close()
355                elif self._debug:
356                    logger.warning("Accept pipe failed on pipe %r",
357                                   pipe, exc_info=True)
358            except futures.CancelledError:
359                if pipe:
360                    pipe.close()
361            else:
362                server._accept_pipe_future = f
363                f.add_done_callback(loop_accept_pipe)
364
365        self.call_soon(loop_accept_pipe)
366        return [server]
367
368    @coroutine
369    def _make_subprocess_transport(self, protocol, args, shell,
370                                   stdin, stdout, stderr, bufsize,
371                                   extra=None, **kwargs):
372        waiter = futures.Future(loop=self)
373        transp = _WindowsSubprocessTransport(self, protocol, args, shell,
374                                             stdin, stdout, stderr, bufsize,
375                                             waiter=waiter, extra=extra,
376                                             **kwargs)
377        try:
378            yield From(waiter)
379        except Exception as exc:
380            # Workaround CPython bug #23353: using yield/yield-from in an
381            # except block of a generator doesn't clear properly sys.exc_info()
382            err = exc
383        else:
384            err = None
385
386        if err is not None:
387            transp.close()
388            yield From(transp._wait())
389            raise err
390
391        raise Return(transp)
392
393
394class IocpProactor(object):
395    """Proactor implementation using IOCP."""
396
397    def __init__(self, concurrency=0xffffffff):
398        self._loop = None
399        self._results = []
400        self._iocp = _overlapped.CreateIoCompletionPort(
401            _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
402        self._cache = {}
403        self._registered = weakref.WeakSet()
404        self._unregistered = []
405        self._stopped_serving = weakref.WeakSet()
406
407    def __repr__(self):
408        return ('<%s overlapped#=%s result#=%s>'
409                % (self.__class__.__name__, len(self._cache),
410                   len(self._results)))
411
412    def set_loop(self, loop):
413        self._loop = loop
414
415    def select(self, timeout=None):
416        if not self._results:
417            self._poll(timeout)
418        tmp = self._results
419        self._results = []
420        return tmp
421
422    def _result(self, value):
423        fut = futures.Future(loop=self._loop)
424        fut.set_result(value)
425        return fut
426
427    def recv(self, conn, nbytes, flags=0):
428        self._register_with_iocp(conn)
429        ov = _overlapped.Overlapped(NULL)
430        try:
431            if isinstance(conn, socket.socket):
432                wrap_error(ov.WSARecv, conn.fileno(), nbytes, flags)
433            else:
434                wrap_error(ov.ReadFile, conn.fileno(), nbytes)
435        except BrokenPipeError:
436            return self._result(b'')
437
438        def finish_recv(trans, key, ov):
439            try:
440                return wrap_error(ov.getresult)
441            except WindowsError as exc:
442                if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
443                    raise ConnectionResetError(*exc.args)
444                else:
445                    raise
446
447        return self._register(ov, conn, finish_recv)
448
449    def send(self, conn, buf, flags=0):
450        self._register_with_iocp(conn)
451        ov = _overlapped.Overlapped(NULL)
452        if isinstance(conn, socket.socket):
453            ov.WSASend(conn.fileno(), buf, flags)
454        else:
455            ov.WriteFile(conn.fileno(), buf)
456
457        def finish_send(trans, key, ov):
458            try:
459                return wrap_error(ov.getresult)
460            except WindowsError as exc:
461                if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
462                    raise ConnectionResetError(*exc.args)
463                else:
464                    raise
465
466        return self._register(ov, conn, finish_send)
467
468    def accept(self, listener):
469        self._register_with_iocp(listener)
470        conn = self._get_accept_socket(listener.family)
471        ov = _overlapped.Overlapped(NULL)
472        ov.AcceptEx(listener.fileno(), conn.fileno())
473
474        def finish_accept(trans, key, ov):
475            wrap_error(ov.getresult)
476            # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
477            buf = struct.pack('@P', listener.fileno())
478            conn.setsockopt(socket.SOL_SOCKET,
479                            _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
480            conn.settimeout(listener.gettimeout())
481            return conn, conn.getpeername()
482
483        @coroutine
484        def accept_coro(future, conn):
485            # Coroutine closing the accept socket if the future is cancelled
486            try:
487                yield From(future)
488            except futures.CancelledError:
489                conn.close()
490                raise
491
492        future = self._register(ov, listener, finish_accept)
493        coro = accept_coro(future, conn)
494        tasks.ensure_future(coro, loop=self._loop)
495        return future
496
497    def connect(self, conn, address):
498        self._register_with_iocp(conn)
499        # The socket needs to be locally bound before we call ConnectEx().
500        try:
501            _overlapped.BindLocal(conn.fileno(), conn.family)
502        except WindowsError as e:
503            if e.winerror != errno.WSAEINVAL:
504                raise
505            # Probably already locally bound; check using getsockname().
506            if conn.getsockname()[1] == 0:
507                raise
508        ov = _overlapped.Overlapped(NULL)
509        ov.ConnectEx(conn.fileno(), address)
510
511        def finish_connect(trans, key, ov):
512            wrap_error(ov.getresult)
513            # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
514            conn.setsockopt(socket.SOL_SOCKET,
515                            _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
516            return conn
517
518        return self._register(ov, conn, finish_connect)
519
520    def accept_pipe(self, pipe):
521        self._register_with_iocp(pipe)
522        ov = _overlapped.Overlapped(NULL)
523        connected = ov.ConnectNamedPipe(pipe.fileno())
524
525        if connected:
526            # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
527            # that the pipe is connected. There is no need to wait for the
528            # completion of the connection.
529            return self._result(pipe)
530
531        def finish_accept_pipe(trans, key, ov):
532            wrap_error(ov.getresult)
533            return pipe
534
535        return self._register(ov, pipe, finish_accept_pipe)
536
537    @coroutine
538    def connect_pipe(self, address):
539        delay = CONNECT_PIPE_INIT_DELAY
540        while True:
541            # Unfortunately there is no way to do an overlapped connect to a pipe.
542            # Call CreateFile() in a loop until it doesn't fail with
543            # ERROR_PIPE_BUSY
544            try:
545                handle = wrap_error(_overlapped.ConnectPipe, address)
546                break
547            except WindowsError as exc:
548                if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
549                    raise
550
551            # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
552            delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
553            yield From(tasks.sleep(delay, loop=self._loop))
554
555        raise Return(windows_utils.PipeHandle(handle))
556
557    def wait_for_handle(self, handle, timeout=None):
558        """Wait for a handle.
559
560        Return a Future object. The result of the future is True if the wait
561        completed, or False if the wait did not complete (on timeout).
562        """
563        return self._wait_for_handle(handle, timeout, False)
564
565    def _wait_cancel(self, event, done_callback):
566        fut = self._wait_for_handle(event, None, True)
567        # add_done_callback() cannot be used because the wait may only complete
568        # in IocpProactor.close(), while the event loop is not running.
569        fut._done_callback = done_callback
570        return fut
571
572    def _wait_for_handle(self, handle, timeout, _is_cancel):
573        if timeout is None:
574            ms = _winapi.INFINITE
575        else:
576            # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
577            # round away from zero to wait *at least* timeout seconds.
578            ms = int(math.ceil(timeout * 1e3))
579
580        # We only create ov so we can use ov.address as a key for the cache.
581        ov = _overlapped.Overlapped(NULL)
582        wait_handle = _overlapped.RegisterWaitWithQueue(
583            handle, self._iocp, ov.address, ms)
584        if _is_cancel:
585            f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
586        else:
587            f = _WaitHandleFuture(ov, handle, wait_handle, self,
588                                  loop=self._loop)
589        if f._source_traceback:
590            del f._source_traceback[-1]
591
592        def finish_wait_for_handle(trans, key, ov):
593            # Note that this second wait means that we should only use
594            # this with handles types where a successful wait has no
595            # effect.  So events or processes are all right, but locks
596            # or semaphores are not.  Also note if the handle is
597            # signalled and then quickly reset, then we may return
598            # False even though we have not timed out.
599            return f._poll()
600
601        self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
602        return f
603
604    def _register_with_iocp(self, obj):
605        # To get notifications of finished ops on this objects sent to the
606        # completion port, were must register the handle.
607        if obj not in self._registered:
608            self._registered.add(obj)
609            _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
610            # XXX We could also use SetFileCompletionNotificationModes()
611            # to avoid sending notifications to completion port of ops
612            # that succeed immediately.
613
614    def _register(self, ov, obj, callback):
615        # Return a future which will be set with the result of the
616        # operation when it completes.  The future's value is actually
617        # the value returned by callback().
618        f = _OverlappedFuture(ov, loop=self._loop)
619        if f._source_traceback:
620            del f._source_traceback[-1]
621        if not ov.pending:
622            # The operation has completed, so no need to postpone the
623            # work.  We cannot take this short cut if we need the
624            # NumberOfBytes, CompletionKey values returned by
625            # PostQueuedCompletionStatus().
626            try:
627                value = callback(None, None, ov)
628            except OSError as e:
629                f.set_exception(e)
630            else:
631                f.set_result(value)
632            # Even if GetOverlappedResult() was called, we have to wait for the
633            # notification of the completion in GetQueuedCompletionStatus().
634            # Register the overlapped operation to keep a reference to the
635            # OVERLAPPED object, otherwise the memory is freed and Windows may
636            # read uninitialized memory.
637
638        # Register the overlapped operation for later.  Note that
639        # we only store obj to prevent it from being garbage
640        # collected too early.
641        self._cache[ov.address] = (f, ov, obj, callback)
642        return f
643
644    def _unregister(self, ov):
645        """Unregister an overlapped object.
646
647        Call this method when its future has been cancelled. The event can
648        already be signalled (pending in the proactor event queue). It is also
649        safe if the event is never signalled (because it was cancelled).
650        """
651        self._unregistered.append(ov)
652
653    def _get_accept_socket(self, family):
654        s = socket.socket(family)
655        s.settimeout(0)
656        return s
657
658    def _poll(self, timeout=None):
659        if timeout is None:
660            ms = INFINITE
661        elif timeout < 0:
662            raise ValueError("negative timeout")
663        else:
664            # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
665            # round away from zero to wait *at least* timeout seconds.
666            ms = int(math.ceil(timeout * 1e3))
667            if ms >= INFINITE:
668                raise ValueError("timeout too big")
669
670        while True:
671            status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
672            if status is None:
673                break
674            ms = 0
675
676            err, transferred, key, address = status
677            try:
678                f, ov, obj, callback = self._cache.pop(address)
679            except KeyError:
680                if self._loop.get_debug():
681                    self._loop.call_exception_handler({
682                        'message': ('GetQueuedCompletionStatus() returned an '
683                                    'unexpected event'),
684                        'status': ('err=%s transferred=%s key=%#x address=%#x'
685                                   % (err, transferred, key, address)),
686                    })
687
688                # key is either zero, or it is used to return a pipe
689                # handle which should be closed to avoid a leak.
690                if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
691                    _winapi.CloseHandle(key)
692                continue
693
694            if obj in self._stopped_serving:
695                f.cancel()
696            # Don't call the callback if _register() already read the result or
697            # if the overlapped has been cancelled
698            elif not f.done():
699                try:
700                    value = callback(transferred, key, ov)
701                except OSError as e:
702                    f.set_exception(e)
703                    self._results.append(f)
704                else:
705                    f.set_result(value)
706                    self._results.append(f)
707
708        # Remove unregisted futures
709        for ov in self._unregistered:
710            self._cache.pop(ov.address, None)
711        del self._unregistered[:]
712
713    def _stop_serving(self, obj):
714        # obj is a socket or pipe handle.  It will be closed in
715        # BaseProactorEventLoop._stop_serving() which will make any
716        # pending operations fail quickly.
717        self._stopped_serving.add(obj)
718
719    def close(self):
720        # Cancel remaining registered operations.
721        for address, (fut, ov, obj, callback) in list(self._cache.items()):
722            if fut.cancelled():
723                # Nothing to do with cancelled futures
724                pass
725            elif isinstance(fut, _WaitCancelFuture):
726                # _WaitCancelFuture must not be cancelled
727                pass
728            else:
729                try:
730                    fut.cancel()
731                except OSError as exc:
732                    if self._loop is not None:
733                        context = {
734                            'message': 'Cancelling a future failed',
735                            'exception': exc,
736                            'future': fut,
737                        }
738                        if fut._source_traceback:
739                            context['source_traceback'] = fut._source_traceback
740                        self._loop.call_exception_handler(context)
741
742        while self._cache:
743            if not self._poll(1):
744                logger.debug('taking long time to close proactor')
745
746        self._results = []
747        if self._iocp is not None:
748            _winapi.CloseHandle(self._iocp)
749            self._iocp = None
750
751    def __del__(self):
752        self.close()
753
754
755class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
756
757    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
758        self._proc = windows_utils.Popen(
759            args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
760            bufsize=bufsize, **kwargs)
761
762        def callback(f):
763            returncode = self._proc.poll()
764            self._process_exited(returncode)
765
766        f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
767        f.add_done_callback(callback)
768
769
770SelectorEventLoop = _WindowsSelectorEventLoop
771
772
773class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
774    _loop_factory = SelectorEventLoop
775
776
777DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy
778