1"""Event loop using a proactor and related classes.
2
3A proactor is a "notify-on-completion" multiplexer.  Currently a
4proactor is only implemented on Windows with IOCP.
5"""
6
7__all__ = 'BaseProactorEventLoop',
8
9import io
10import os
11import socket
12import warnings
13import signal
14import threading
15import collections
16
17from . import base_events
18from . import constants
19from . import futures
20from . import exceptions
21from . import protocols
22from . import sslproto
23from . import transports
24from . import trsock
25from .log import logger
26
27
28def _set_socket_extra(transport, sock):
29    transport._extra['socket'] = trsock.TransportSocket(sock)
30
31    try:
32        transport._extra['sockname'] = sock.getsockname()
33    except socket.error:
34        if transport._loop.get_debug():
35            logger.warning(
36                "getsockname() failed on %r", sock, exc_info=True)
37
38    if 'peername' not in transport._extra:
39        try:
40            transport._extra['peername'] = sock.getpeername()
41        except socket.error:
42            # UDP sockets may not have a peer name
43            transport._extra['peername'] = None
44
45
46class _ProactorBasePipeTransport(transports._FlowControlMixin,
47                                 transports.BaseTransport):
48    """Base class for pipe and socket transports."""
49
50    def __init__(self, loop, sock, protocol, waiter=None,
51                 extra=None, server=None):
52        super().__init__(extra, loop)
53        self._set_extra(sock)
54        self._sock = sock
55        self.set_protocol(protocol)
56        self._server = server
57        self._buffer = None  # None or bytearray.
58        self._read_fut = None
59        self._write_fut = None
60        self._pending_write = 0
61        self._conn_lost = 0
62        self._closing = False  # Set when close() called.
63        self._eof_written = False
64        if self._server is not None:
65            self._server._attach()
66        self._loop.call_soon(self._protocol.connection_made, self)
67        if waiter is not None:
68            # only wake up the waiter when connection_made() has been called
69            self._loop.call_soon(futures._set_result_unless_cancelled,
70                                 waiter, None)
71
72    def __repr__(self):
73        info = [self.__class__.__name__]
74        if self._sock is None:
75            info.append('closed')
76        elif self._closing:
77            info.append('closing')
78        if self._sock is not None:
79            info.append(f'fd={self._sock.fileno()}')
80        if self._read_fut is not None:
81            info.append(f'read={self._read_fut!r}')
82        if self._write_fut is not None:
83            info.append(f'write={self._write_fut!r}')
84        if self._buffer:
85            info.append(f'write_bufsize={len(self._buffer)}')
86        if self._eof_written:
87            info.append('EOF written')
88        return '<{}>'.format(' '.join(info))
89
90    def _set_extra(self, sock):
91        self._extra['pipe'] = sock
92
93    def set_protocol(self, protocol):
94        self._protocol = protocol
95
96    def get_protocol(self):
97        return self._protocol
98
99    def is_closing(self):
100        return self._closing
101
102    def close(self):
103        if self._closing:
104            return
105        self._closing = True
106        self._conn_lost += 1
107        if not self._buffer and self._write_fut is None:
108            self._loop.call_soon(self._call_connection_lost, None)
109        if self._read_fut is not None:
110            self._read_fut.cancel()
111            self._read_fut = None
112
113    def __del__(self, _warn=warnings.warn):
114        if self._sock is not None:
115            _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
116            self.close()
117
118    def _fatal_error(self, exc, message='Fatal error on pipe transport'):
119        try:
120            if isinstance(exc, OSError):
121                if self._loop.get_debug():
122                    logger.debug("%r: %s", self, message, exc_info=True)
123            else:
124                self._loop.call_exception_handler({
125                    'message': message,
126                    'exception': exc,
127                    'transport': self,
128                    'protocol': self._protocol,
129                })
130        finally:
131            self._force_close(exc)
132
133    def _force_close(self, exc):
134        if self._empty_waiter is not None and not self._empty_waiter.done():
135            if exc is None:
136                self._empty_waiter.set_result(None)
137            else:
138                self._empty_waiter.set_exception(exc)
139        if self._closing:
140            return
141        self._closing = True
142        self._conn_lost += 1
143        if self._write_fut:
144            self._write_fut.cancel()
145            self._write_fut = None
146        if self._read_fut:
147            self._read_fut.cancel()
148            self._read_fut = None
149        self._pending_write = 0
150        self._buffer = None
151        self._loop.call_soon(self._call_connection_lost, exc)
152
153    def _call_connection_lost(self, exc):
154        try:
155            self._protocol.connection_lost(exc)
156        finally:
157            # XXX If there is a pending overlapped read on the other
158            # end then it may fail with ERROR_NETNAME_DELETED if we
159            # just close our end.  First calling shutdown() seems to
160            # cure it, but maybe using DisconnectEx() would be better.
161            if hasattr(self._sock, 'shutdown'):
162                self._sock.shutdown(socket.SHUT_RDWR)
163            self._sock.close()
164            self._sock = None
165            server = self._server
166            if server is not None:
167                server._detach()
168                self._server = None
169
170    def get_write_buffer_size(self):
171        size = self._pending_write
172        if self._buffer is not None:
173            size += len(self._buffer)
174        return size
175
176
177class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
178                                 transports.ReadTransport):
179    """Transport for read pipes."""
180
181    def __init__(self, loop, sock, protocol, waiter=None,
182                 extra=None, server=None):
183        self._pending_data = None
184        self._paused = True
185        super().__init__(loop, sock, protocol, waiter, extra, server)
186
187        self._loop.call_soon(self._loop_reading)
188        self._paused = False
189
190    def is_reading(self):
191        return not self._paused and not self._closing
192
193    def pause_reading(self):
194        if self._closing or self._paused:
195            return
196        self._paused = True
197
198        # bpo-33694: Don't cancel self._read_fut because cancelling an
199        # overlapped WSASend() loss silently data with the current proactor
200        # implementation.
201        #
202        # If CancelIoEx() fails with ERROR_NOT_FOUND, it means that WSASend()
203        # completed (even if HasOverlappedIoCompleted() returns 0), but
204        # Overlapped.cancel() currently silently ignores the ERROR_NOT_FOUND
205        # error. Once the overlapped is ignored, the IOCP loop will ignores the
206        # completion I/O event and so not read the result of the overlapped
207        # WSARecv().
208
209        if self._loop.get_debug():
210            logger.debug("%r pauses reading", self)
211
212    def resume_reading(self):
213        if self._closing or not self._paused:
214            return
215
216        self._paused = False
217        if self._read_fut is None:
218            self._loop.call_soon(self._loop_reading, None)
219
220        data = self._pending_data
221        self._pending_data = None
222        if data is not None:
223            # Call the protocol methode after calling _loop_reading(),
224            # since the protocol can decide to pause reading again.
225            self._loop.call_soon(self._data_received, data)
226
227        if self._loop.get_debug():
228            logger.debug("%r resumes reading", self)
229
230    def _eof_received(self):
231        if self._loop.get_debug():
232            logger.debug("%r received EOF", self)
233
234        try:
235            keep_open = self._protocol.eof_received()
236        except (SystemExit, KeyboardInterrupt):
237            raise
238        except BaseException as exc:
239            self._fatal_error(
240                exc, 'Fatal error: protocol.eof_received() call failed.')
241            return
242
243        if not keep_open:
244            self.close()
245
246    def _data_received(self, data):
247        if self._paused:
248            # Don't call any protocol method while reading is paused.
249            # The protocol will be called on resume_reading().
250            assert self._pending_data is None
251            self._pending_data = data
252            return
253
254        if not data:
255            self._eof_received()
256            return
257
258        if isinstance(self._protocol, protocols.BufferedProtocol):
259            try:
260                protocols._feed_data_to_buffered_proto(self._protocol, data)
261            except (SystemExit, KeyboardInterrupt):
262                raise
263            except BaseException as exc:
264                self._fatal_error(exc,
265                                  'Fatal error: protocol.buffer_updated() '
266                                  'call failed.')
267                return
268        else:
269            self._protocol.data_received(data)
270
271    def _loop_reading(self, fut=None):
272        data = None
273        try:
274            if fut is not None:
275                assert self._read_fut is fut or (self._read_fut is None and
276                                                 self._closing)
277                self._read_fut = None
278                if fut.done():
279                    # deliver data later in "finally" clause
280                    data = fut.result()
281                else:
282                    # the future will be replaced by next proactor.recv call
283                    fut.cancel()
284
285            if self._closing:
286                # since close() has been called we ignore any read data
287                data = None
288                return
289
290            if data == b'':
291                # we got end-of-file so no need to reschedule a new read
292                return
293
294            # bpo-33694: buffer_updated() has currently no fast path because of
295            # a data loss issue caused by overlapped WSASend() cancellation.
296
297            if not self._paused:
298                # reschedule a new read
299                self._read_fut = self._loop._proactor.recv(self._sock, 32768)
300        except ConnectionAbortedError as exc:
301            if not self._closing:
302                self._fatal_error(exc, 'Fatal read error on pipe transport')
303            elif self._loop.get_debug():
304                logger.debug("Read error on pipe transport while closing",
305                             exc_info=True)
306        except ConnectionResetError as exc:
307            self._force_close(exc)
308        except OSError as exc:
309            self._fatal_error(exc, 'Fatal read error on pipe transport')
310        except exceptions.CancelledError:
311            if not self._closing:
312                raise
313        else:
314            if not self._paused:
315                self._read_fut.add_done_callback(self._loop_reading)
316        finally:
317            if data is not None:
318                self._data_received(data)
319
320
321class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
322                                      transports.WriteTransport):
323    """Transport for write pipes."""
324
325    _start_tls_compatible = True
326
327    def __init__(self, *args, **kw):
328        super().__init__(*args, **kw)
329        self._empty_waiter = None
330
331    def write(self, data):
332        if not isinstance(data, (bytes, bytearray, memoryview)):
333            raise TypeError(
334                f"data argument must be a bytes-like object, "
335                f"not {type(data).__name__}")
336        if self._eof_written:
337            raise RuntimeError('write_eof() already called')
338        if self._empty_waiter is not None:
339            raise RuntimeError('unable to write; sendfile is in progress')
340
341        if not data:
342            return
343
344        if self._conn_lost:
345            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
346                logger.warning('socket.send() raised exception.')
347            self._conn_lost += 1
348            return
349
350        # Observable states:
351        # 1. IDLE: _write_fut and _buffer both None
352        # 2. WRITING: _write_fut set; _buffer None
353        # 3. BACKED UP: _write_fut set; _buffer a bytearray
354        # We always copy the data, so the caller can't modify it
355        # while we're still waiting for the I/O to happen.
356        if self._write_fut is None:  # IDLE -> WRITING
357            assert self._buffer is None
358            # Pass a copy, except if it's already immutable.
359            self._loop_writing(data=bytes(data))
360        elif not self._buffer:  # WRITING -> BACKED UP
361            # Make a mutable copy which we can extend.
362            self._buffer = bytearray(data)
363            self._maybe_pause_protocol()
364        else:  # BACKED UP
365            # Append to buffer (also copies).
366            self._buffer.extend(data)
367            self._maybe_pause_protocol()
368
369    def _loop_writing(self, f=None, data=None):
370        try:
371            if f is not None and self._write_fut is None and self._closing:
372                # XXX most likely self._force_close() has been called, and
373                # it has set self._write_fut to None.
374                return
375            assert f is self._write_fut
376            self._write_fut = None
377            self._pending_write = 0
378            if f:
379                f.result()
380            if data is None:
381                data = self._buffer
382                self._buffer = None
383            if not data:
384                if self._closing:
385                    self._loop.call_soon(self._call_connection_lost, None)
386                if self._eof_written:
387                    self._sock.shutdown(socket.SHUT_WR)
388                # Now that we've reduced the buffer size, tell the
389                # protocol to resume writing if it was paused.  Note that
390                # we do this last since the callback is called immediately
391                # and it may add more data to the buffer (even causing the
392                # protocol to be paused again).
393                self._maybe_resume_protocol()
394            else:
395                self._write_fut = self._loop._proactor.send(self._sock, data)
396                if not self._write_fut.done():
397                    assert self._pending_write == 0
398                    self._pending_write = len(data)
399                    self._write_fut.add_done_callback(self._loop_writing)
400                    self._maybe_pause_protocol()
401                else:
402                    self._write_fut.add_done_callback(self._loop_writing)
403            if self._empty_waiter is not None and self._write_fut is None:
404                self._empty_waiter.set_result(None)
405        except ConnectionResetError as exc:
406            self._force_close(exc)
407        except OSError as exc:
408            self._fatal_error(exc, 'Fatal write error on pipe transport')
409
410    def can_write_eof(self):
411        return True
412
413    def write_eof(self):
414        self.close()
415
416    def abort(self):
417        self._force_close(None)
418
419    def _make_empty_waiter(self):
420        if self._empty_waiter is not None:
421            raise RuntimeError("Empty waiter is already set")
422        self._empty_waiter = self._loop.create_future()
423        if self._write_fut is None:
424            self._empty_waiter.set_result(None)
425        return self._empty_waiter
426
427    def _reset_empty_waiter(self):
428        self._empty_waiter = None
429
430
431class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
432    def __init__(self, *args, **kw):
433        super().__init__(*args, **kw)
434        self._read_fut = self._loop._proactor.recv(self._sock, 16)
435        self._read_fut.add_done_callback(self._pipe_closed)
436
437    def _pipe_closed(self, fut):
438        if fut.cancelled():
439            # the transport has been closed
440            return
441        assert fut.result() == b''
442        if self._closing:
443            assert self._read_fut is None
444            return
445        assert fut is self._read_fut, (fut, self._read_fut)
446        self._read_fut = None
447        if self._write_fut is not None:
448            self._force_close(BrokenPipeError())
449        else:
450            self.close()
451
452
453class _ProactorDatagramTransport(_ProactorBasePipeTransport):
454    max_size = 256 * 1024
455    def __init__(self, loop, sock, protocol, address=None,
456                 waiter=None, extra=None):
457        self._address = address
458        self._empty_waiter = None
459        # We don't need to call _protocol.connection_made() since our base
460        # constructor does it for us.
461        super().__init__(loop, sock, protocol, waiter=waiter, extra=extra)
462
463        # The base constructor sets _buffer = None, so we set it here
464        self._buffer = collections.deque()
465        self._loop.call_soon(self._loop_reading)
466
467    def _set_extra(self, sock):
468        _set_socket_extra(self, sock)
469
470    def get_write_buffer_size(self):
471        return sum(len(data) for data, _ in self._buffer)
472
473    def abort(self):
474        self._force_close(None)
475
476    def sendto(self, data, addr=None):
477        if not isinstance(data, (bytes, bytearray, memoryview)):
478            raise TypeError('data argument must be bytes-like object (%r)',
479                            type(data))
480
481        if not data:
482            return
483
484        if self._address is not None and addr not in (None, self._address):
485            raise ValueError(
486                f'Invalid address: must be None or {self._address}')
487
488        if self._conn_lost and self._address:
489            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
490                logger.warning('socket.sendto() raised exception.')
491            self._conn_lost += 1
492            return
493
494        # Ensure that what we buffer is immutable.
495        self._buffer.append((bytes(data), addr))
496
497        if self._write_fut is None:
498            # No current write operations are active, kick one off
499            self._loop_writing()
500        # else: A write operation is already kicked off
501
502        self._maybe_pause_protocol()
503
504    def _loop_writing(self, fut=None):
505        try:
506            if self._conn_lost:
507                return
508
509            assert fut is self._write_fut
510            self._write_fut = None
511            if fut:
512                # We are in a _loop_writing() done callback, get the result
513                fut.result()
514
515            if not self._buffer or (self._conn_lost and self._address):
516                # The connection has been closed
517                if self._closing:
518                    self._loop.call_soon(self._call_connection_lost, None)
519                return
520
521            data, addr = self._buffer.popleft()
522            if self._address is not None:
523                self._write_fut = self._loop._proactor.send(self._sock,
524                                                            data)
525            else:
526                self._write_fut = self._loop._proactor.sendto(self._sock,
527                                                              data,
528                                                              addr=addr)
529        except OSError as exc:
530            self._protocol.error_received(exc)
531        except Exception as exc:
532            self._fatal_error(exc, 'Fatal write error on datagram transport')
533        else:
534            self._write_fut.add_done_callback(self._loop_writing)
535            self._maybe_resume_protocol()
536
537    def _loop_reading(self, fut=None):
538        data = None
539        try:
540            if self._conn_lost:
541                return
542
543            assert self._read_fut is fut or (self._read_fut is None and
544                                             self._closing)
545
546            self._read_fut = None
547            if fut is not None:
548                res = fut.result()
549
550                if self._closing:
551                    # since close() has been called we ignore any read data
552                    data = None
553                    return
554
555                if self._address is not None:
556                    data, addr = res, self._address
557                else:
558                    data, addr = res
559
560            if self._conn_lost:
561                return
562            if self._address is not None:
563                self._read_fut = self._loop._proactor.recv(self._sock,
564                                                           self.max_size)
565            else:
566                self._read_fut = self._loop._proactor.recvfrom(self._sock,
567                                                               self.max_size)
568        except OSError as exc:
569            self._protocol.error_received(exc)
570        except exceptions.CancelledError:
571            if not self._closing:
572                raise
573        else:
574            if self._read_fut is not None:
575                self._read_fut.add_done_callback(self._loop_reading)
576        finally:
577            if data:
578                self._protocol.datagram_received(data, addr)
579
580
581class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
582                                   _ProactorBaseWritePipeTransport,
583                                   transports.Transport):
584    """Transport for duplex pipes."""
585
586    def can_write_eof(self):
587        return False
588
589    def write_eof(self):
590        raise NotImplementedError
591
592
593class _ProactorSocketTransport(_ProactorReadPipeTransport,
594                               _ProactorBaseWritePipeTransport,
595                               transports.Transport):
596    """Transport for connected sockets."""
597
598    _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
599
600    def __init__(self, loop, sock, protocol, waiter=None,
601                 extra=None, server=None):
602        super().__init__(loop, sock, protocol, waiter, extra, server)
603        base_events._set_nodelay(sock)
604
605    def _set_extra(self, sock):
606        _set_socket_extra(self, sock)
607
608    def can_write_eof(self):
609        return True
610
611    def write_eof(self):
612        if self._closing or self._eof_written:
613            return
614        self._eof_written = True
615        if self._write_fut is None:
616            self._sock.shutdown(socket.SHUT_WR)
617
618
619class BaseProactorEventLoop(base_events.BaseEventLoop):
620
621    def __init__(self, proactor):
622        super().__init__()
623        logger.debug('Using proactor: %s', proactor.__class__.__name__)
624        self._proactor = proactor
625        self._selector = proactor   # convenient alias
626        self._self_reading_future = None
627        self._accept_futures = {}   # socket file descriptor => Future
628        proactor.set_loop(self)
629        self._make_self_pipe()
630        if threading.current_thread() is threading.main_thread():
631            # wakeup fd can only be installed to a file descriptor from the main thread
632            signal.set_wakeup_fd(self._csock.fileno())
633
634    def _make_socket_transport(self, sock, protocol, waiter=None,
635                               extra=None, server=None):
636        return _ProactorSocketTransport(self, sock, protocol, waiter,
637                                        extra, server)
638
639    def _make_ssl_transport(
640            self, rawsock, protocol, sslcontext, waiter=None,
641            *, server_side=False, server_hostname=None,
642            extra=None, server=None,
643            ssl_handshake_timeout=None):
644        ssl_protocol = sslproto.SSLProtocol(
645                self, protocol, sslcontext, waiter,
646                server_side, server_hostname,
647                ssl_handshake_timeout=ssl_handshake_timeout)
648        _ProactorSocketTransport(self, rawsock, ssl_protocol,
649                                 extra=extra, server=server)
650        return ssl_protocol._app_transport
651
652    def _make_datagram_transport(self, sock, protocol,
653                                 address=None, waiter=None, extra=None):
654        return _ProactorDatagramTransport(self, sock, protocol, address,
655                                          waiter, extra)
656
657    def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
658                                    extra=None):
659        return _ProactorDuplexPipeTransport(self,
660                                            sock, protocol, waiter, extra)
661
662    def _make_read_pipe_transport(self, sock, protocol, waiter=None,
663                                  extra=None):
664        return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
665
666    def _make_write_pipe_transport(self, sock, protocol, waiter=None,
667                                   extra=None):
668        # We want connection_lost() to be called when other end closes
669        return _ProactorWritePipeTransport(self,
670                                           sock, protocol, waiter, extra)
671
672    def close(self):
673        if self.is_running():
674            raise RuntimeError("Cannot close a running event loop")
675        if self.is_closed():
676            return
677
678        if threading.current_thread() is threading.main_thread():
679            signal.set_wakeup_fd(-1)
680        # Call these methods before closing the event loop (before calling
681        # BaseEventLoop.close), because they can schedule callbacks with
682        # call_soon(), which is forbidden when the event loop is closed.
683        self._stop_accept_futures()
684        self._close_self_pipe()
685        self._proactor.close()
686        self._proactor = None
687        self._selector = None
688
689        # Close the event loop
690        super().close()
691
692    async def sock_recv(self, sock, n):
693        return await self._proactor.recv(sock, n)
694
695    async def sock_recv_into(self, sock, buf):
696        return await self._proactor.recv_into(sock, buf)
697
698    async def sock_sendall(self, sock, data):
699        return await self._proactor.send(sock, data)
700
701    async def sock_connect(self, sock, address):
702        return await self._proactor.connect(sock, address)
703
704    async def sock_accept(self, sock):
705        return await self._proactor.accept(sock)
706
707    async def _sock_sendfile_native(self, sock, file, offset, count):
708        try:
709            fileno = file.fileno()
710        except (AttributeError, io.UnsupportedOperation) as err:
711            raise exceptions.SendfileNotAvailableError("not a regular file")
712        try:
713            fsize = os.fstat(fileno).st_size
714        except OSError as err:
715            raise exceptions.SendfileNotAvailableError("not a regular file")
716        blocksize = count if count else fsize
717        if not blocksize:
718            return 0  # empty file
719
720        blocksize = min(blocksize, 0xffff_ffff)
721        end_pos = min(offset + count, fsize) if count else fsize
722        offset = min(offset, fsize)
723        total_sent = 0
724        try:
725            while True:
726                blocksize = min(end_pos - offset, blocksize)
727                if blocksize <= 0:
728                    return total_sent
729                await self._proactor.sendfile(sock, file, offset, blocksize)
730                offset += blocksize
731                total_sent += blocksize
732        finally:
733            if total_sent > 0:
734                file.seek(offset)
735
736    async def _sendfile_native(self, transp, file, offset, count):
737        resume_reading = transp.is_reading()
738        transp.pause_reading()
739        await transp._make_empty_waiter()
740        try:
741            return await self.sock_sendfile(transp._sock, file, offset, count,
742                                            fallback=False)
743        finally:
744            transp._reset_empty_waiter()
745            if resume_reading:
746                transp.resume_reading()
747
748    def _close_self_pipe(self):
749        if self._self_reading_future is not None:
750            self._self_reading_future.cancel()
751            self._self_reading_future = None
752        self._ssock.close()
753        self._ssock = None
754        self._csock.close()
755        self._csock = None
756        self._internal_fds -= 1
757
758    def _make_self_pipe(self):
759        # A self-socket, really. :-)
760        self._ssock, self._csock = socket.socketpair()
761        self._ssock.setblocking(False)
762        self._csock.setblocking(False)
763        self._internal_fds += 1
764
765    def _loop_self_reading(self, f=None):
766        try:
767            if f is not None:
768                f.result()  # may raise
769            if self._self_reading_future is not f:
770                # When we scheduled this Future, we assigned it to
771                # _self_reading_future. If it's not there now, something has
772                # tried to cancel the loop while this callback was still in the
773                # queue (see windows_events.ProactorEventLoop.run_forever). In
774                # that case stop here instead of continuing to schedule a new
775                # iteration.
776                return
777            f = self._proactor.recv(self._ssock, 4096)
778        except exceptions.CancelledError:
779            # _close_self_pipe() has been called, stop waiting for data
780            return
781        except (SystemExit, KeyboardInterrupt):
782            raise
783        except BaseException as exc:
784            self.call_exception_handler({
785                'message': 'Error on reading from the event loop self pipe',
786                'exception': exc,
787                'loop': self,
788            })
789        else:
790            self._self_reading_future = f
791            f.add_done_callback(self._loop_self_reading)
792
793    def _write_to_self(self):
794        # This may be called from a different thread, possibly after
795        # _close_self_pipe() has been called or even while it is
796        # running.  Guard for self._csock being None or closed.  When
797        # a socket is closed, send() raises OSError (with errno set to
798        # EBADF, but let's not rely on the exact error code).
799        csock = self._csock
800        if csock is None:
801            return
802
803        try:
804            csock.send(b'\0')
805        except OSError:
806            if self._debug:
807                logger.debug("Fail to write a null byte into the "
808                             "self-pipe socket",
809                             exc_info=True)
810
811    def _start_serving(self, protocol_factory, sock,
812                       sslcontext=None, server=None, backlog=100,
813                       ssl_handshake_timeout=None):
814
815        def loop(f=None):
816            try:
817                if f is not None:
818                    conn, addr = f.result()
819                    if self._debug:
820                        logger.debug("%r got a new connection from %r: %r",
821                                     server, addr, conn)
822                    protocol = protocol_factory()
823                    if sslcontext is not None:
824                        self._make_ssl_transport(
825                            conn, protocol, sslcontext, server_side=True,
826                            extra={'peername': addr}, server=server,
827                            ssl_handshake_timeout=ssl_handshake_timeout)
828                    else:
829                        self._make_socket_transport(
830                            conn, protocol,
831                            extra={'peername': addr}, server=server)
832                if self.is_closed():
833                    return
834                f = self._proactor.accept(sock)
835            except OSError as exc:
836                if sock.fileno() != -1:
837                    self.call_exception_handler({
838                        'message': 'Accept failed on a socket',
839                        'exception': exc,
840                        'socket': trsock.TransportSocket(sock),
841                    })
842                    sock.close()
843                elif self._debug:
844                    logger.debug("Accept failed on socket %r",
845                                 sock, exc_info=True)
846            except exceptions.CancelledError:
847                sock.close()
848            else:
849                self._accept_futures[sock.fileno()] = f
850                f.add_done_callback(loop)
851
852        self.call_soon(loop)
853
854    def _process_events(self, event_list):
855        # Events are processed in the IocpProactor._poll() method
856        pass
857
858    def _stop_accept_futures(self):
859        for future in self._accept_futures.values():
860            future.cancel()
861        self._accept_futures.clear()
862
863    def _stop_serving(self, sock):
864        future = self._accept_futures.pop(sock.fileno(), None)
865        if future:
866            future.cancel()
867        self._proactor._stop_serving(sock)
868        sock.close()
869