1"""Selector and proactor event loops for Windows."""
2
3import _overlapped
4import _winapi
5import errno
6import math
7import msvcrt
8import socket
9import struct
10import time
11import weakref
12
13from . import events
14from . import base_subprocess
15from . import futures
16from . import exceptions
17from . import proactor_events
18from . import selector_events
19from . import tasks
20from . import windows_utils
21from .log import logger
22
23
24__all__ = (
25    'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
26    'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
27    'WindowsProactorEventLoopPolicy',
28)
29
30
31NULL = 0
32INFINITE = 0xffffffff
33ERROR_CONNECTION_REFUSED = 1225
34ERROR_CONNECTION_ABORTED = 1236
35
36# Initial delay in seconds for connect_pipe() before retrying to connect
37CONNECT_PIPE_INIT_DELAY = 0.001
38
39# Maximum delay in seconds for connect_pipe() before retrying to connect
40CONNECT_PIPE_MAX_DELAY = 0.100
41
42
43class _OverlappedFuture(futures.Future):
44    """Subclass of Future which represents an overlapped operation.
45
46    Cancelling it will immediately cancel the overlapped operation.
47    """
48
49    def __init__(self, ov, *, loop=None):
50        super().__init__(loop=loop)
51        if self._source_traceback:
52            del self._source_traceback[-1]
53        self._ov = ov
54
55    def _repr_info(self):
56        info = super()._repr_info()
57        if self._ov is not None:
58            state = 'pending' if self._ov.pending else 'completed'
59            info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
60        return info
61
62    def _cancel_overlapped(self):
63        if self._ov is None:
64            return
65        try:
66            self._ov.cancel()
67        except OSError as exc:
68            context = {
69                'message': 'Cancelling an overlapped future failed',
70                'exception': exc,
71                'future': self,
72            }
73            if self._source_traceback:
74                context['source_traceback'] = self._source_traceback
75            self._loop.call_exception_handler(context)
76        self._ov = None
77
78    def cancel(self):
79        self._cancel_overlapped()
80        return super().cancel()
81
82    def set_exception(self, exception):
83        super().set_exception(exception)
84        self._cancel_overlapped()
85
86    def set_result(self, result):
87        super().set_result(result)
88        self._ov = None
89
90
91class _BaseWaitHandleFuture(futures.Future):
92    """Subclass of Future which represents a wait handle."""
93
94    def __init__(self, ov, handle, wait_handle, *, loop=None):
95        super().__init__(loop=loop)
96        if self._source_traceback:
97            del self._source_traceback[-1]
98        # Keep a reference to the Overlapped object to keep it alive until the
99        # wait is unregistered
100        self._ov = ov
101        self._handle = handle
102        self._wait_handle = wait_handle
103
104        # Should we call UnregisterWaitEx() if the wait completes
105        # or is cancelled?
106        self._registered = True
107
108    def _poll(self):
109        # non-blocking wait: use a timeout of 0 millisecond
110        return (_winapi.WaitForSingleObject(self._handle, 0) ==
111                _winapi.WAIT_OBJECT_0)
112
113    def _repr_info(self):
114        info = super()._repr_info()
115        info.append(f'handle={self._handle:#x}')
116        if self._handle is not None:
117            state = 'signaled' if self._poll() else 'waiting'
118            info.append(state)
119        if self._wait_handle is not None:
120            info.append(f'wait_handle={self._wait_handle:#x}')
121        return info
122
123    def _unregister_wait_cb(self, fut):
124        # The wait was unregistered: it's not safe to destroy the Overlapped
125        # object
126        self._ov = None
127
128    def _unregister_wait(self):
129        if not self._registered:
130            return
131        self._registered = False
132
133        wait_handle = self._wait_handle
134        self._wait_handle = None
135        try:
136            _overlapped.UnregisterWait(wait_handle)
137        except OSError as exc:
138            if exc.winerror != _overlapped.ERROR_IO_PENDING:
139                context = {
140                    'message': 'Failed to unregister the wait handle',
141                    'exception': exc,
142                    'future': self,
143                }
144                if self._source_traceback:
145                    context['source_traceback'] = self._source_traceback
146                self._loop.call_exception_handler(context)
147                return
148            # ERROR_IO_PENDING means that the unregister is pending
149
150        self._unregister_wait_cb(None)
151
152    def cancel(self):
153        self._unregister_wait()
154        return super().cancel()
155
156    def set_exception(self, exception):
157        self._unregister_wait()
158        super().set_exception(exception)
159
160    def set_result(self, result):
161        self._unregister_wait()
162        super().set_result(result)
163
164
165class _WaitCancelFuture(_BaseWaitHandleFuture):
166    """Subclass of Future which represents a wait for the cancellation of a
167    _WaitHandleFuture using an event.
168    """
169
170    def __init__(self, ov, event, wait_handle, *, loop=None):
171        super().__init__(ov, event, wait_handle, loop=loop)
172
173        self._done_callback = None
174
175    def cancel(self):
176        raise RuntimeError("_WaitCancelFuture must not be cancelled")
177
178    def set_result(self, result):
179        super().set_result(result)
180        if self._done_callback is not None:
181            self._done_callback(self)
182
183    def set_exception(self, exception):
184        super().set_exception(exception)
185        if self._done_callback is not None:
186            self._done_callback(self)
187
188
189class _WaitHandleFuture(_BaseWaitHandleFuture):
190    def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
191        super().__init__(ov, handle, wait_handle, loop=loop)
192        self._proactor = proactor
193        self._unregister_proactor = True
194        self._event = _overlapped.CreateEvent(None, True, False, None)
195        self._event_fut = None
196
197    def _unregister_wait_cb(self, fut):
198        if self._event is not None:
199            _winapi.CloseHandle(self._event)
200            self._event = None
201            self._event_fut = None
202
203        # If the wait was cancelled, the wait may never be signalled, so
204        # it's required to unregister it. Otherwise, IocpProactor.close() will
205        # wait forever for an event which will never come.
206        #
207        # If the IocpProactor already received the event, it's safe to call
208        # _unregister() because we kept a reference to the Overlapped object
209        # which is used as a unique key.
210        self._proactor._unregister(self._ov)
211        self._proactor = None
212
213        super()._unregister_wait_cb(fut)
214
215    def _unregister_wait(self):
216        if not self._registered:
217            return
218        self._registered = False
219
220        wait_handle = self._wait_handle
221        self._wait_handle = None
222        try:
223            _overlapped.UnregisterWaitEx(wait_handle, self._event)
224        except OSError as exc:
225            if exc.winerror != _overlapped.ERROR_IO_PENDING:
226                context = {
227                    'message': 'Failed to unregister the wait handle',
228                    'exception': exc,
229                    'future': self,
230                }
231                if self._source_traceback:
232                    context['source_traceback'] = self._source_traceback
233                self._loop.call_exception_handler(context)
234                return
235            # ERROR_IO_PENDING is not an error, the wait was unregistered
236
237        self._event_fut = self._proactor._wait_cancel(self._event,
238                                                      self._unregister_wait_cb)
239
240
241class PipeServer(object):
242    """Class representing a pipe server.
243
244    This is much like a bound, listening socket.
245    """
246    def __init__(self, address):
247        self._address = address
248        self._free_instances = weakref.WeakSet()
249        # initialize the pipe attribute before calling _server_pipe_handle()
250        # because this function can raise an exception and the destructor calls
251        # the close() method
252        self._pipe = None
253        self._accept_pipe_future = None
254        self._pipe = self._server_pipe_handle(True)
255
256    def _get_unconnected_pipe(self):
257        # Create new instance and return previous one.  This ensures
258        # that (until the server is closed) there is always at least
259        # one pipe handle for address.  Therefore if a client attempt
260        # to connect it will not fail with FileNotFoundError.
261        tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
262        return tmp
263
264    def _server_pipe_handle(self, first):
265        # Return a wrapper for a new pipe handle.
266        if self.closed():
267            return None
268        flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
269        if first:
270            flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
271        h = _winapi.CreateNamedPipe(
272            self._address, flags,
273            _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
274            _winapi.PIPE_WAIT,
275            _winapi.PIPE_UNLIMITED_INSTANCES,
276            windows_utils.BUFSIZE, windows_utils.BUFSIZE,
277            _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
278        pipe = windows_utils.PipeHandle(h)
279        self._free_instances.add(pipe)
280        return pipe
281
282    def closed(self):
283        return (self._address is None)
284
285    def close(self):
286        if self._accept_pipe_future is not None:
287            self._accept_pipe_future.cancel()
288            self._accept_pipe_future = None
289        # Close all instances which have not been connected to by a client.
290        if self._address is not None:
291            for pipe in self._free_instances:
292                pipe.close()
293            self._pipe = None
294            self._address = None
295            self._free_instances.clear()
296
297    __del__ = close
298
299
300class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
301    """Windows version of selector event loop."""
302
303
304class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
305    """Windows version of proactor event loop using IOCP."""
306
307    def __init__(self, proactor=None):
308        if proactor is None:
309            proactor = IocpProactor()
310        super().__init__(proactor)
311
312    def run_forever(self):
313        try:
314            assert self._self_reading_future is None
315            self.call_soon(self._loop_self_reading)
316            super().run_forever()
317        finally:
318            if self._self_reading_future is not None:
319                ov = self._self_reading_future._ov
320                self._self_reading_future.cancel()
321                # self_reading_future was just cancelled so if it hasn't been
322                # finished yet, it never will be (it's possible that it has
323                # already finished and its callback is waiting in the queue,
324                # where it could still happen if the event loop is restarted).
325                # Unregister it otherwise IocpProactor.close will wait for it
326                # forever
327                if ov is not None:
328                    self._proactor._unregister(ov)
329                self._self_reading_future = None
330
331    async def create_pipe_connection(self, protocol_factory, address):
332        f = self._proactor.connect_pipe(address)
333        pipe = await f
334        protocol = protocol_factory()
335        trans = self._make_duplex_pipe_transport(pipe, protocol,
336                                                 extra={'addr': address})
337        return trans, protocol
338
339    async def start_serving_pipe(self, protocol_factory, address):
340        server = PipeServer(address)
341
342        def loop_accept_pipe(f=None):
343            pipe = None
344            try:
345                if f:
346                    pipe = f.result()
347                    server._free_instances.discard(pipe)
348
349                    if server.closed():
350                        # A client connected before the server was closed:
351                        # drop the client (close the pipe) and exit
352                        pipe.close()
353                        return
354
355                    protocol = protocol_factory()
356                    self._make_duplex_pipe_transport(
357                        pipe, protocol, extra={'addr': address})
358
359                pipe = server._get_unconnected_pipe()
360                if pipe is None:
361                    return
362
363                f = self._proactor.accept_pipe(pipe)
364            except OSError as exc:
365                if pipe and pipe.fileno() != -1:
366                    self.call_exception_handler({
367                        'message': 'Pipe accept failed',
368                        'exception': exc,
369                        'pipe': pipe,
370                    })
371                    pipe.close()
372                elif self._debug:
373                    logger.warning("Accept pipe failed on pipe %r",
374                                   pipe, exc_info=True)
375            except exceptions.CancelledError:
376                if pipe:
377                    pipe.close()
378            else:
379                server._accept_pipe_future = f
380                f.add_done_callback(loop_accept_pipe)
381
382        self.call_soon(loop_accept_pipe)
383        return [server]
384
385    async def _make_subprocess_transport(self, protocol, args, shell,
386                                         stdin, stdout, stderr, bufsize,
387                                         extra=None, **kwargs):
388        waiter = self.create_future()
389        transp = _WindowsSubprocessTransport(self, protocol, args, shell,
390                                             stdin, stdout, stderr, bufsize,
391                                             waiter=waiter, extra=extra,
392                                             **kwargs)
393        try:
394            await waiter
395        except (SystemExit, KeyboardInterrupt):
396            raise
397        except BaseException:
398            transp.close()
399            await transp._wait()
400            raise
401
402        return transp
403
404
405class IocpProactor:
406    """Proactor implementation using IOCP."""
407
408    def __init__(self, concurrency=0xffffffff):
409        self._loop = None
410        self._results = []
411        self._iocp = _overlapped.CreateIoCompletionPort(
412            _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
413        self._cache = {}
414        self._registered = weakref.WeakSet()
415        self._unregistered = []
416        self._stopped_serving = weakref.WeakSet()
417
418    def _check_closed(self):
419        if self._iocp is None:
420            raise RuntimeError('IocpProactor is closed')
421
422    def __repr__(self):
423        info = ['overlapped#=%s' % len(self._cache),
424                'result#=%s' % len(self._results)]
425        if self._iocp is None:
426            info.append('closed')
427        return '<%s %s>' % (self.__class__.__name__, " ".join(info))
428
429    def set_loop(self, loop):
430        self._loop = loop
431
432    def select(self, timeout=None):
433        if not self._results:
434            self._poll(timeout)
435        tmp = self._results
436        self._results = []
437        return tmp
438
439    def _result(self, value):
440        fut = self._loop.create_future()
441        fut.set_result(value)
442        return fut
443
444    def recv(self, conn, nbytes, flags=0):
445        self._register_with_iocp(conn)
446        ov = _overlapped.Overlapped(NULL)
447        try:
448            if isinstance(conn, socket.socket):
449                ov.WSARecv(conn.fileno(), nbytes, flags)
450            else:
451                ov.ReadFile(conn.fileno(), nbytes)
452        except BrokenPipeError:
453            return self._result(b'')
454
455        def finish_recv(trans, key, ov):
456            try:
457                return ov.getresult()
458            except OSError as exc:
459                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
460                                    _overlapped.ERROR_OPERATION_ABORTED):
461                    raise ConnectionResetError(*exc.args)
462                else:
463                    raise
464
465        return self._register(ov, conn, finish_recv)
466
467    def recv_into(self, conn, buf, flags=0):
468        self._register_with_iocp(conn)
469        ov = _overlapped.Overlapped(NULL)
470        try:
471            if isinstance(conn, socket.socket):
472                ov.WSARecvInto(conn.fileno(), buf, flags)
473            else:
474                ov.ReadFileInto(conn.fileno(), buf)
475        except BrokenPipeError:
476            return self._result(0)
477
478        def finish_recv(trans, key, ov):
479            try:
480                return ov.getresult()
481            except OSError as exc:
482                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
483                                    _overlapped.ERROR_OPERATION_ABORTED):
484                    raise ConnectionResetError(*exc.args)
485                else:
486                    raise
487
488        return self._register(ov, conn, finish_recv)
489
490    def recvfrom(self, conn, nbytes, flags=0):
491        self._register_with_iocp(conn)
492        ov = _overlapped.Overlapped(NULL)
493        try:
494            ov.WSARecvFrom(conn.fileno(), nbytes, flags)
495        except BrokenPipeError:
496            return self._result((b'', None))
497
498        def finish_recv(trans, key, ov):
499            try:
500                return ov.getresult()
501            except OSError as exc:
502                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
503                                    _overlapped.ERROR_OPERATION_ABORTED):
504                    raise ConnectionResetError(*exc.args)
505                else:
506                    raise
507
508        return self._register(ov, conn, finish_recv)
509
510    def sendto(self, conn, buf, flags=0, addr=None):
511        self._register_with_iocp(conn)
512        ov = _overlapped.Overlapped(NULL)
513
514        ov.WSASendTo(conn.fileno(), buf, flags, addr)
515
516        def finish_send(trans, key, ov):
517            try:
518                return ov.getresult()
519            except OSError as exc:
520                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
521                                    _overlapped.ERROR_OPERATION_ABORTED):
522                    raise ConnectionResetError(*exc.args)
523                else:
524                    raise
525
526        return self._register(ov, conn, finish_send)
527
528    def send(self, conn, buf, flags=0):
529        self._register_with_iocp(conn)
530        ov = _overlapped.Overlapped(NULL)
531        if isinstance(conn, socket.socket):
532            ov.WSASend(conn.fileno(), buf, flags)
533        else:
534            ov.WriteFile(conn.fileno(), buf)
535
536        def finish_send(trans, key, ov):
537            try:
538                return ov.getresult()
539            except OSError as exc:
540                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
541                                    _overlapped.ERROR_OPERATION_ABORTED):
542                    raise ConnectionResetError(*exc.args)
543                else:
544                    raise
545
546        return self._register(ov, conn, finish_send)
547
548    def accept(self, listener):
549        self._register_with_iocp(listener)
550        conn = self._get_accept_socket(listener.family)
551        ov = _overlapped.Overlapped(NULL)
552        ov.AcceptEx(listener.fileno(), conn.fileno())
553
554        def finish_accept(trans, key, ov):
555            ov.getresult()
556            # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
557            buf = struct.pack('@P', listener.fileno())
558            conn.setsockopt(socket.SOL_SOCKET,
559                            _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
560            conn.settimeout(listener.gettimeout())
561            return conn, conn.getpeername()
562
563        async def accept_coro(future, conn):
564            # Coroutine closing the accept socket if the future is cancelled
565            try:
566                await future
567            except exceptions.CancelledError:
568                conn.close()
569                raise
570
571        future = self._register(ov, listener, finish_accept)
572        coro = accept_coro(future, conn)
573        tasks.ensure_future(coro, loop=self._loop)
574        return future
575
576    def connect(self, conn, address):
577        if conn.type == socket.SOCK_DGRAM:
578            # WSAConnect will complete immediately for UDP sockets so we don't
579            # need to register any IOCP operation
580            _overlapped.WSAConnect(conn.fileno(), address)
581            fut = self._loop.create_future()
582            fut.set_result(None)
583            return fut
584
585        self._register_with_iocp(conn)
586        # The socket needs to be locally bound before we call ConnectEx().
587        try:
588            _overlapped.BindLocal(conn.fileno(), conn.family)
589        except OSError as e:
590            if e.winerror != errno.WSAEINVAL:
591                raise
592            # Probably already locally bound; check using getsockname().
593            if conn.getsockname()[1] == 0:
594                raise
595        ov = _overlapped.Overlapped(NULL)
596        ov.ConnectEx(conn.fileno(), address)
597
598        def finish_connect(trans, key, ov):
599            ov.getresult()
600            # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
601            conn.setsockopt(socket.SOL_SOCKET,
602                            _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
603            return conn
604
605        return self._register(ov, conn, finish_connect)
606
607    def sendfile(self, sock, file, offset, count):
608        self._register_with_iocp(sock)
609        ov = _overlapped.Overlapped(NULL)
610        offset_low = offset & 0xffff_ffff
611        offset_high = (offset >> 32) & 0xffff_ffff
612        ov.TransmitFile(sock.fileno(),
613                        msvcrt.get_osfhandle(file.fileno()),
614                        offset_low, offset_high,
615                        count, 0, 0)
616
617        def finish_sendfile(trans, key, ov):
618            try:
619                return ov.getresult()
620            except OSError as exc:
621                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
622                                    _overlapped.ERROR_OPERATION_ABORTED):
623                    raise ConnectionResetError(*exc.args)
624                else:
625                    raise
626        return self._register(ov, sock, finish_sendfile)
627
628    def accept_pipe(self, pipe):
629        self._register_with_iocp(pipe)
630        ov = _overlapped.Overlapped(NULL)
631        connected = ov.ConnectNamedPipe(pipe.fileno())
632
633        if connected:
634            # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
635            # that the pipe is connected. There is no need to wait for the
636            # completion of the connection.
637            return self._result(pipe)
638
639        def finish_accept_pipe(trans, key, ov):
640            ov.getresult()
641            return pipe
642
643        return self._register(ov, pipe, finish_accept_pipe)
644
645    async def connect_pipe(self, address):
646        delay = CONNECT_PIPE_INIT_DELAY
647        while True:
648            # Unfortunately there is no way to do an overlapped connect to
649            # a pipe.  Call CreateFile() in a loop until it doesn't fail with
650            # ERROR_PIPE_BUSY.
651            try:
652                handle = _overlapped.ConnectPipe(address)
653                break
654            except OSError as exc:
655                if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
656                    raise
657
658            # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
659            delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
660            await tasks.sleep(delay)
661
662        return windows_utils.PipeHandle(handle)
663
664    def wait_for_handle(self, handle, timeout=None):
665        """Wait for a handle.
666
667        Return a Future object. The result of the future is True if the wait
668        completed, or False if the wait did not complete (on timeout).
669        """
670        return self._wait_for_handle(handle, timeout, False)
671
672    def _wait_cancel(self, event, done_callback):
673        fut = self._wait_for_handle(event, None, True)
674        # add_done_callback() cannot be used because the wait may only complete
675        # in IocpProactor.close(), while the event loop is not running.
676        fut._done_callback = done_callback
677        return fut
678
679    def _wait_for_handle(self, handle, timeout, _is_cancel):
680        self._check_closed()
681
682        if timeout is None:
683            ms = _winapi.INFINITE
684        else:
685            # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
686            # round away from zero to wait *at least* timeout seconds.
687            ms = math.ceil(timeout * 1e3)
688
689        # We only create ov so we can use ov.address as a key for the cache.
690        ov = _overlapped.Overlapped(NULL)
691        wait_handle = _overlapped.RegisterWaitWithQueue(
692            handle, self._iocp, ov.address, ms)
693        if _is_cancel:
694            f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
695        else:
696            f = _WaitHandleFuture(ov, handle, wait_handle, self,
697                                  loop=self._loop)
698        if f._source_traceback:
699            del f._source_traceback[-1]
700
701        def finish_wait_for_handle(trans, key, ov):
702            # Note that this second wait means that we should only use
703            # this with handles types where a successful wait has no
704            # effect.  So events or processes are all right, but locks
705            # or semaphores are not.  Also note if the handle is
706            # signalled and then quickly reset, then we may return
707            # False even though we have not timed out.
708            return f._poll()
709
710        self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
711        return f
712
713    def _register_with_iocp(self, obj):
714        # To get notifications of finished ops on this objects sent to the
715        # completion port, were must register the handle.
716        if obj not in self._registered:
717            self._registered.add(obj)
718            _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
719            # XXX We could also use SetFileCompletionNotificationModes()
720            # to avoid sending notifications to completion port of ops
721            # that succeed immediately.
722
723    def _register(self, ov, obj, callback):
724        self._check_closed()
725
726        # Return a future which will be set with the result of the
727        # operation when it completes.  The future's value is actually
728        # the value returned by callback().
729        f = _OverlappedFuture(ov, loop=self._loop)
730        if f._source_traceback:
731            del f._source_traceback[-1]
732        if not ov.pending:
733            # The operation has completed, so no need to postpone the
734            # work.  We cannot take this short cut if we need the
735            # NumberOfBytes, CompletionKey values returned by
736            # PostQueuedCompletionStatus().
737            try:
738                value = callback(None, None, ov)
739            except OSError as e:
740                f.set_exception(e)
741            else:
742                f.set_result(value)
743            # Even if GetOverlappedResult() was called, we have to wait for the
744            # notification of the completion in GetQueuedCompletionStatus().
745            # Register the overlapped operation to keep a reference to the
746            # OVERLAPPED object, otherwise the memory is freed and Windows may
747            # read uninitialized memory.
748
749        # Register the overlapped operation for later.  Note that
750        # we only store obj to prevent it from being garbage
751        # collected too early.
752        self._cache[ov.address] = (f, ov, obj, callback)
753        return f
754
755    def _unregister(self, ov):
756        """Unregister an overlapped object.
757
758        Call this method when its future has been cancelled. The event can
759        already be signalled (pending in the proactor event queue). It is also
760        safe if the event is never signalled (because it was cancelled).
761        """
762        self._check_closed()
763        self._unregistered.append(ov)
764
765    def _get_accept_socket(self, family):
766        s = socket.socket(family)
767        s.settimeout(0)
768        return s
769
770    def _poll(self, timeout=None):
771        if timeout is None:
772            ms = INFINITE
773        elif timeout < 0:
774            raise ValueError("negative timeout")
775        else:
776            # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
777            # round away from zero to wait *at least* timeout seconds.
778            ms = math.ceil(timeout * 1e3)
779            if ms >= INFINITE:
780                raise ValueError("timeout too big")
781
782        while True:
783            status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
784            if status is None:
785                break
786            ms = 0
787
788            err, transferred, key, address = status
789            try:
790                f, ov, obj, callback = self._cache.pop(address)
791            except KeyError:
792                if self._loop.get_debug():
793                    self._loop.call_exception_handler({
794                        'message': ('GetQueuedCompletionStatus() returned an '
795                                    'unexpected event'),
796                        'status': ('err=%s transferred=%s key=%#x address=%#x'
797                                   % (err, transferred, key, address)),
798                    })
799
800                # key is either zero, or it is used to return a pipe
801                # handle which should be closed to avoid a leak.
802                if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
803                    _winapi.CloseHandle(key)
804                continue
805
806            if obj in self._stopped_serving:
807                f.cancel()
808            # Don't call the callback if _register() already read the result or
809            # if the overlapped has been cancelled
810            elif not f.done():
811                try:
812                    value = callback(transferred, key, ov)
813                except OSError as e:
814                    f.set_exception(e)
815                    self._results.append(f)
816                else:
817                    f.set_result(value)
818                    self._results.append(f)
819
820        # Remove unregistered futures
821        for ov in self._unregistered:
822            self._cache.pop(ov.address, None)
823        self._unregistered.clear()
824
825    def _stop_serving(self, obj):
826        # obj is a socket or pipe handle.  It will be closed in
827        # BaseProactorEventLoop._stop_serving() which will make any
828        # pending operations fail quickly.
829        self._stopped_serving.add(obj)
830
831    def close(self):
832        if self._iocp is None:
833            # already closed
834            return
835
836        # Cancel remaining registered operations.
837        for address, (fut, ov, obj, callback) in list(self._cache.items()):
838            if fut.cancelled():
839                # Nothing to do with cancelled futures
840                pass
841            elif isinstance(fut, _WaitCancelFuture):
842                # _WaitCancelFuture must not be cancelled
843                pass
844            else:
845                try:
846                    fut.cancel()
847                except OSError as exc:
848                    if self._loop is not None:
849                        context = {
850                            'message': 'Cancelling a future failed',
851                            'exception': exc,
852                            'future': fut,
853                        }
854                        if fut._source_traceback:
855                            context['source_traceback'] = fut._source_traceback
856                        self._loop.call_exception_handler(context)
857
858        # Wait until all cancelled overlapped complete: don't exit with running
859        # overlapped to prevent a crash. Display progress every second if the
860        # loop is still running.
861        msg_update = 1.0
862        start_time = time.monotonic()
863        next_msg = start_time + msg_update
864        while self._cache:
865            if next_msg <= time.monotonic():
866                logger.debug('%r is running after closing for %.1f seconds',
867                             self, time.monotonic() - start_time)
868                next_msg = time.monotonic() + msg_update
869
870            # handle a few events, or timeout
871            self._poll(msg_update)
872
873        self._results = []
874
875        _winapi.CloseHandle(self._iocp)
876        self._iocp = None
877
878    def __del__(self):
879        self.close()
880
881
882class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
883
884    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
885        self._proc = windows_utils.Popen(
886            args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
887            bufsize=bufsize, **kwargs)
888
889        def callback(f):
890            returncode = self._proc.poll()
891            self._process_exited(returncode)
892
893        f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
894        f.add_done_callback(callback)
895
896
897SelectorEventLoop = _WindowsSelectorEventLoop
898
899
900class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
901    _loop_factory = SelectorEventLoop
902
903
904class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
905    _loop_factory = ProactorEventLoop
906
907
908DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy
909