1"""Event loop using a proactor and related classes.
2
3A proactor is a "notify-on-completion" multiplexer.  Currently a
4proactor is only implemented on Windows with IOCP.
5"""
6
7__all__ = 'BaseProactorEventLoop',
8
9import io
10import os
11import socket
12import warnings
13
14from . import base_events
15from . import constants
16from . import events
17from . import futures
18from . import protocols
19from . import sslproto
20from . import transports
21from .log import logger
22
23
24class _ProactorBasePipeTransport(transports._FlowControlMixin,
25                                 transports.BaseTransport):
26    """Base class for pipe and socket transports."""
27
28    def __init__(self, loop, sock, protocol, waiter=None,
29                 extra=None, server=None):
30        super().__init__(extra, loop)
31        self._set_extra(sock)
32        self._sock = sock
33        self.set_protocol(protocol)
34        self._server = server
35        self._buffer = None  # None or bytearray.
36        self._read_fut = None
37        self._write_fut = None
38        self._pending_write = 0
39        self._conn_lost = 0
40        self._closing = False  # Set when close() called.
41        self._eof_written = False
42        if self._server is not None:
43            self._server._attach()
44        self._loop.call_soon(self._protocol.connection_made, self)
45        if waiter is not None:
46            # only wake up the waiter when connection_made() has been called
47            self._loop.call_soon(futures._set_result_unless_cancelled,
48                                 waiter, None)
49
50    def __repr__(self):
51        info = [self.__class__.__name__]
52        if self._sock is None:
53            info.append('closed')
54        elif self._closing:
55            info.append('closing')
56        if self._sock is not None:
57            info.append(f'fd={self._sock.fileno()}')
58        if self._read_fut is not None:
59            info.append(f'read={self._read_fut!r}')
60        if self._write_fut is not None:
61            info.append(f'write={self._write_fut!r}')
62        if self._buffer:
63            info.append(f'write_bufsize={len(self._buffer)}')
64        if self._eof_written:
65            info.append('EOF written')
66        return '<{}>'.format(' '.join(info))
67
68    def _set_extra(self, sock):
69        self._extra['pipe'] = sock
70
71    def set_protocol(self, protocol):
72        self._protocol = protocol
73
74    def get_protocol(self):
75        return self._protocol
76
77    def is_closing(self):
78        return self._closing
79
80    def close(self):
81        if self._closing:
82            return
83        self._closing = True
84        self._conn_lost += 1
85        if not self._buffer and self._write_fut is None:
86            self._loop.call_soon(self._call_connection_lost, None)
87        if self._read_fut is not None:
88            self._read_fut.cancel()
89            self._read_fut = None
90
91    def __del__(self):
92        if self._sock is not None:
93            warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
94                          source=self)
95            self.close()
96
97    def _fatal_error(self, exc, message='Fatal error on pipe transport'):
98        try:
99            if isinstance(exc, OSError):
100                if self._loop.get_debug():
101                    logger.debug("%r: %s", self, message, exc_info=True)
102            else:
103                self._loop.call_exception_handler({
104                    'message': message,
105                    'exception': exc,
106                    'transport': self,
107                    'protocol': self._protocol,
108                })
109        finally:
110            self._force_close(exc)
111
112    def _force_close(self, exc):
113        if self._empty_waiter is not None and not self._empty_waiter.done():
114            if exc is None:
115                self._empty_waiter.set_result(None)
116            else:
117                self._empty_waiter.set_exception(exc)
118        if self._closing:
119            return
120        self._closing = True
121        self._conn_lost += 1
122        if self._write_fut:
123            self._write_fut.cancel()
124            self._write_fut = None
125        if self._read_fut:
126            self._read_fut.cancel()
127            self._read_fut = None
128        self._pending_write = 0
129        self._buffer = None
130        self._loop.call_soon(self._call_connection_lost, exc)
131
132    def _call_connection_lost(self, exc):
133        try:
134            self._protocol.connection_lost(exc)
135        finally:
136            # XXX If there is a pending overlapped read on the other
137            # end then it may fail with ERROR_NETNAME_DELETED if we
138            # just close our end.  First calling shutdown() seems to
139            # cure it, but maybe using DisconnectEx() would be better.
140            if hasattr(self._sock, 'shutdown'):
141                self._sock.shutdown(socket.SHUT_RDWR)
142            self._sock.close()
143            self._sock = None
144            server = self._server
145            if server is not None:
146                server._detach()
147                self._server = None
148
149    def get_write_buffer_size(self):
150        size = self._pending_write
151        if self._buffer is not None:
152            size += len(self._buffer)
153        return size
154
155
156class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
157                                 transports.ReadTransport):
158    """Transport for read pipes."""
159
160    def __init__(self, loop, sock, protocol, waiter=None,
161                 extra=None, server=None):
162        self._pending_data = None
163        self._paused = True
164        super().__init__(loop, sock, protocol, waiter, extra, server)
165
166        self._loop.call_soon(self._loop_reading)
167        self._paused = False
168
169    def is_reading(self):
170        return not self._paused and not self._closing
171
172    def pause_reading(self):
173        if self._closing or self._paused:
174            return
175        self._paused = True
176
177        # bpo-33694: Don't cancel self._read_fut because cancelling an
178        # overlapped WSASend() loss silently data with the current proactor
179        # implementation.
180        #
181        # If CancelIoEx() fails with ERROR_NOT_FOUND, it means that WSASend()
182        # completed (even if HasOverlappedIoCompleted() returns 0), but
183        # Overlapped.cancel() currently silently ignores the ERROR_NOT_FOUND
184        # error. Once the overlapped is ignored, the IOCP loop will ignores the
185        # completion I/O event and so not read the result of the overlapped
186        # WSARecv().
187
188        if self._loop.get_debug():
189            logger.debug("%r pauses reading", self)
190
191    def resume_reading(self):
192        if self._closing or not self._paused:
193            return
194
195        self._paused = False
196        if self._read_fut is None:
197            self._loop.call_soon(self._loop_reading, None)
198
199        data = self._pending_data
200        self._pending_data = None
201        if data is not None:
202            # Call the protocol methode after calling _loop_reading(),
203            # since the protocol can decide to pause reading again.
204            self._loop.call_soon(self._data_received, data)
205
206        if self._loop.get_debug():
207            logger.debug("%r resumes reading", self)
208
209    def _eof_received(self):
210        if self._loop.get_debug():
211            logger.debug("%r received EOF", self)
212
213        try:
214            keep_open = self._protocol.eof_received()
215        except Exception as exc:
216            self._fatal_error(
217                exc, 'Fatal error: protocol.eof_received() call failed.')
218            return
219
220        if not keep_open:
221            self.close()
222
223    def _data_received(self, data):
224        if self._paused:
225            # Don't call any protocol method while reading is paused.
226            # The protocol will be called on resume_reading().
227            assert self._pending_data is None
228            self._pending_data = data
229            return
230
231        if not data:
232            self._eof_received()
233            return
234
235        if isinstance(self._protocol, protocols.BufferedProtocol):
236            try:
237                protocols._feed_data_to_buffered_proto(self._protocol, data)
238            except Exception as exc:
239                self._fatal_error(exc,
240                                  'Fatal error: protocol.buffer_updated() '
241                                  'call failed.')
242                return
243        else:
244            self._protocol.data_received(data)
245
246    def _loop_reading(self, fut=None):
247        data = None
248        try:
249            if fut is not None:
250                assert self._read_fut is fut or (self._read_fut is None and
251                                                 self._closing)
252                self._read_fut = None
253                if fut.done():
254                    # deliver data later in "finally" clause
255                    data = fut.result()
256                else:
257                    # the future will be replaced by next proactor.recv call
258                    fut.cancel()
259
260            if self._closing:
261                # since close() has been called we ignore any read data
262                data = None
263                return
264
265            if data == b'':
266                # we got end-of-file so no need to reschedule a new read
267                return
268
269            # bpo-33694: buffer_updated() has currently no fast path because of
270            # a data loss issue caused by overlapped WSASend() cancellation.
271
272            if not self._paused:
273                # reschedule a new read
274                self._read_fut = self._loop._proactor.recv(self._sock, 32768)
275        except ConnectionAbortedError as exc:
276            if not self._closing:
277                self._fatal_error(exc, 'Fatal read error on pipe transport')
278            elif self._loop.get_debug():
279                logger.debug("Read error on pipe transport while closing",
280                             exc_info=True)
281        except ConnectionResetError as exc:
282            self._force_close(exc)
283        except OSError as exc:
284            self._fatal_error(exc, 'Fatal read error on pipe transport')
285        except futures.CancelledError:
286            if not self._closing:
287                raise
288        else:
289            if not self._paused:
290                self._read_fut.add_done_callback(self._loop_reading)
291        finally:
292            if data is not None:
293                self._data_received(data)
294
295
296class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
297                                      transports.WriteTransport):
298    """Transport for write pipes."""
299
300    _start_tls_compatible = True
301
302    def __init__(self, *args, **kw):
303        super().__init__(*args, **kw)
304        self._empty_waiter = None
305
306    def write(self, data):
307        if not isinstance(data, (bytes, bytearray, memoryview)):
308            raise TypeError(
309                f"data argument must be a bytes-like object, "
310                f"not {type(data).__name__}")
311        if self._eof_written:
312            raise RuntimeError('write_eof() already called')
313        if self._empty_waiter is not None:
314            raise RuntimeError('unable to write; sendfile is in progress')
315
316        if not data:
317            return
318
319        if self._conn_lost:
320            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
321                logger.warning('socket.send() raised exception.')
322            self._conn_lost += 1
323            return
324
325        # Observable states:
326        # 1. IDLE: _write_fut and _buffer both None
327        # 2. WRITING: _write_fut set; _buffer None
328        # 3. BACKED UP: _write_fut set; _buffer a bytearray
329        # We always copy the data, so the caller can't modify it
330        # while we're still waiting for the I/O to happen.
331        if self._write_fut is None:  # IDLE -> WRITING
332            assert self._buffer is None
333            # Pass a copy, except if it's already immutable.
334            self._loop_writing(data=bytes(data))
335        elif not self._buffer:  # WRITING -> BACKED UP
336            # Make a mutable copy which we can extend.
337            self._buffer = bytearray(data)
338            self._maybe_pause_protocol()
339        else:  # BACKED UP
340            # Append to buffer (also copies).
341            self._buffer.extend(data)
342            self._maybe_pause_protocol()
343
344    def _loop_writing(self, f=None, data=None):
345        try:
346            if f is not None and self._write_fut is None and self._closing:
347                # XXX most likely self._force_close() has been called, and
348                # it has set self._write_fut to None.
349                return
350            assert f is self._write_fut
351            self._write_fut = None
352            self._pending_write = 0
353            if f:
354                f.result()
355            if data is None:
356                data = self._buffer
357                self._buffer = None
358            if not data:
359                if self._closing:
360                    self._loop.call_soon(self._call_connection_lost, None)
361                if self._eof_written:
362                    self._sock.shutdown(socket.SHUT_WR)
363                # Now that we've reduced the buffer size, tell the
364                # protocol to resume writing if it was paused.  Note that
365                # we do this last since the callback is called immediately
366                # and it may add more data to the buffer (even causing the
367                # protocol to be paused again).
368                self._maybe_resume_protocol()
369            else:
370                self._write_fut = self._loop._proactor.send(self._sock, data)
371                if not self._write_fut.done():
372                    assert self._pending_write == 0
373                    self._pending_write = len(data)
374                    self._write_fut.add_done_callback(self._loop_writing)
375                    self._maybe_pause_protocol()
376                else:
377                    self._write_fut.add_done_callback(self._loop_writing)
378            if self._empty_waiter is not None and self._write_fut is None:
379                self._empty_waiter.set_result(None)
380        except ConnectionResetError as exc:
381            self._force_close(exc)
382        except OSError as exc:
383            self._fatal_error(exc, 'Fatal write error on pipe transport')
384
385    def can_write_eof(self):
386        return True
387
388    def write_eof(self):
389        self.close()
390
391    def abort(self):
392        self._force_close(None)
393
394    def _make_empty_waiter(self):
395        if self._empty_waiter is not None:
396            raise RuntimeError("Empty waiter is already set")
397        self._empty_waiter = self._loop.create_future()
398        if self._write_fut is None:
399            self._empty_waiter.set_result(None)
400        return self._empty_waiter
401
402    def _reset_empty_waiter(self):
403        self._empty_waiter = None
404
405
406class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
407    def __init__(self, *args, **kw):
408        super().__init__(*args, **kw)
409        self._read_fut = self._loop._proactor.recv(self._sock, 16)
410        self._read_fut.add_done_callback(self._pipe_closed)
411
412    def _pipe_closed(self, fut):
413        if fut.cancelled():
414            # the transport has been closed
415            return
416        assert fut.result() == b''
417        if self._closing:
418            assert self._read_fut is None
419            return
420        assert fut is self._read_fut, (fut, self._read_fut)
421        self._read_fut = None
422        if self._write_fut is not None:
423            self._force_close(BrokenPipeError())
424        else:
425            self.close()
426
427
428class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
429                                   _ProactorBaseWritePipeTransport,
430                                   transports.Transport):
431    """Transport for duplex pipes."""
432
433    def can_write_eof(self):
434        return False
435
436    def write_eof(self):
437        raise NotImplementedError
438
439
440class _ProactorSocketTransport(_ProactorReadPipeTransport,
441                               _ProactorBaseWritePipeTransport,
442                               transports.Transport):
443    """Transport for connected sockets."""
444
445    _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
446
447    def __init__(self, loop, sock, protocol, waiter=None,
448                 extra=None, server=None):
449        super().__init__(loop, sock, protocol, waiter, extra, server)
450        base_events._set_nodelay(sock)
451
452    def _set_extra(self, sock):
453        self._extra['socket'] = sock
454
455        try:
456            self._extra['sockname'] = sock.getsockname()
457        except (socket.error, AttributeError):
458            if self._loop.get_debug():
459                logger.warning(
460                    "getsockname() failed on %r", sock, exc_info=True)
461
462        if 'peername' not in self._extra:
463            try:
464                self._extra['peername'] = sock.getpeername()
465            except (socket.error, AttributeError):
466                if self._loop.get_debug():
467                    logger.warning("getpeername() failed on %r",
468                                   sock, exc_info=True)
469
470    def can_write_eof(self):
471        return True
472
473    def write_eof(self):
474        if self._closing or self._eof_written:
475            return
476        self._eof_written = True
477        if self._write_fut is None:
478            self._sock.shutdown(socket.SHUT_WR)
479
480
481class BaseProactorEventLoop(base_events.BaseEventLoop):
482
483    def __init__(self, proactor):
484        super().__init__()
485        logger.debug('Using proactor: %s', proactor.__class__.__name__)
486        self._proactor = proactor
487        self._selector = proactor   # convenient alias
488        self._self_reading_future = None
489        self._accept_futures = {}   # socket file descriptor => Future
490        proactor.set_loop(self)
491        self._make_self_pipe()
492
493    def _make_socket_transport(self, sock, protocol, waiter=None,
494                               extra=None, server=None):
495        return _ProactorSocketTransport(self, sock, protocol, waiter,
496                                        extra, server)
497
498    def _make_ssl_transport(
499            self, rawsock, protocol, sslcontext, waiter=None,
500            *, server_side=False, server_hostname=None,
501            extra=None, server=None,
502            ssl_handshake_timeout=None):
503        ssl_protocol = sslproto.SSLProtocol(
504                self, protocol, sslcontext, waiter,
505                server_side, server_hostname,
506                ssl_handshake_timeout=ssl_handshake_timeout)
507        _ProactorSocketTransport(self, rawsock, ssl_protocol,
508                                 extra=extra, server=server)
509        return ssl_protocol._app_transport
510
511    def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
512                                    extra=None):
513        return _ProactorDuplexPipeTransport(self,
514                                            sock, protocol, waiter, extra)
515
516    def _make_read_pipe_transport(self, sock, protocol, waiter=None,
517                                  extra=None):
518        return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
519
520    def _make_write_pipe_transport(self, sock, protocol, waiter=None,
521                                   extra=None):
522        # We want connection_lost() to be called when other end closes
523        return _ProactorWritePipeTransport(self,
524                                           sock, protocol, waiter, extra)
525
526    def close(self):
527        if self.is_running():
528            raise RuntimeError("Cannot close a running event loop")
529        if self.is_closed():
530            return
531
532        # Call these methods before closing the event loop (before calling
533        # BaseEventLoop.close), because they can schedule callbacks with
534        # call_soon(), which is forbidden when the event loop is closed.
535        self._stop_accept_futures()
536        self._close_self_pipe()
537        self._proactor.close()
538        self._proactor = None
539        self._selector = None
540
541        # Close the event loop
542        super().close()
543
544    async def sock_recv(self, sock, n):
545        return await self._proactor.recv(sock, n)
546
547    async def sock_recv_into(self, sock, buf):
548        return await self._proactor.recv_into(sock, buf)
549
550    async def sock_sendall(self, sock, data):
551        return await self._proactor.send(sock, data)
552
553    async def sock_connect(self, sock, address):
554        return await self._proactor.connect(sock, address)
555
556    async def sock_accept(self, sock):
557        return await self._proactor.accept(sock)
558
559    async def _sock_sendfile_native(self, sock, file, offset, count):
560        try:
561            fileno = file.fileno()
562        except (AttributeError, io.UnsupportedOperation) as err:
563            raise events.SendfileNotAvailableError("not a regular file")
564        try:
565            fsize = os.fstat(fileno).st_size
566        except OSError as err:
567            raise events.SendfileNotAvailableError("not a regular file")
568        blocksize = count if count else fsize
569        if not blocksize:
570            return 0  # empty file
571
572        blocksize = min(blocksize, 0xffff_ffff)
573        end_pos = min(offset + count, fsize) if count else fsize
574        offset = min(offset, fsize)
575        total_sent = 0
576        try:
577            while True:
578                blocksize = min(end_pos - offset, blocksize)
579                if blocksize <= 0:
580                    return total_sent
581                await self._proactor.sendfile(sock, file, offset, blocksize)
582                offset += blocksize
583                total_sent += blocksize
584        finally:
585            if total_sent > 0:
586                file.seek(offset)
587
588    async def _sendfile_native(self, transp, file, offset, count):
589        resume_reading = transp.is_reading()
590        transp.pause_reading()
591        await transp._make_empty_waiter()
592        try:
593            return await self.sock_sendfile(transp._sock, file, offset, count,
594                                            fallback=False)
595        finally:
596            transp._reset_empty_waiter()
597            if resume_reading:
598                transp.resume_reading()
599
600    def _close_self_pipe(self):
601        if self._self_reading_future is not None:
602            self._self_reading_future.cancel()
603            self._self_reading_future = None
604        self._ssock.close()
605        self._ssock = None
606        self._csock.close()
607        self._csock = None
608        self._internal_fds -= 1
609
610    def _make_self_pipe(self):
611        # A self-socket, really. :-)
612        self._ssock, self._csock = socket.socketpair()
613        self._ssock.setblocking(False)
614        self._csock.setblocking(False)
615        self._internal_fds += 1
616        self.call_soon(self._loop_self_reading)
617
618    def _loop_self_reading(self, f=None):
619        try:
620            if f is not None:
621                f.result()  # may raise
622            f = self._proactor.recv(self._ssock, 4096)
623        except futures.CancelledError:
624            # _close_self_pipe() has been called, stop waiting for data
625            return
626        except Exception as exc:
627            self.call_exception_handler({
628                'message': 'Error on reading from the event loop self pipe',
629                'exception': exc,
630                'loop': self,
631            })
632        else:
633            self._self_reading_future = f
634            f.add_done_callback(self._loop_self_reading)
635
636    def _write_to_self(self):
637        try:
638            self._csock.send(b'\0')
639        except OSError:
640            if self._debug:
641                logger.debug("Fail to write a null byte into the "
642                             "self-pipe socket",
643                             exc_info=True)
644
645    def _start_serving(self, protocol_factory, sock,
646                       sslcontext=None, server=None, backlog=100,
647                       ssl_handshake_timeout=None):
648
649        def loop(f=None):
650            try:
651                if f is not None:
652                    conn, addr = f.result()
653                    if self._debug:
654                        logger.debug("%r got a new connection from %r: %r",
655                                     server, addr, conn)
656                    protocol = protocol_factory()
657                    if sslcontext is not None:
658                        self._make_ssl_transport(
659                            conn, protocol, sslcontext, server_side=True,
660                            extra={'peername': addr}, server=server,
661                            ssl_handshake_timeout=ssl_handshake_timeout)
662                    else:
663                        self._make_socket_transport(
664                            conn, protocol,
665                            extra={'peername': addr}, server=server)
666                if self.is_closed():
667                    return
668                f = self._proactor.accept(sock)
669            except OSError as exc:
670                if sock.fileno() != -1:
671                    self.call_exception_handler({
672                        'message': 'Accept failed on a socket',
673                        'exception': exc,
674                        'socket': sock,
675                    })
676                    sock.close()
677                elif self._debug:
678                    logger.debug("Accept failed on socket %r",
679                                 sock, exc_info=True)
680            except futures.CancelledError:
681                sock.close()
682            else:
683                self._accept_futures[sock.fileno()] = f
684                f.add_done_callback(loop)
685
686        self.call_soon(loop)
687
688    def _process_events(self, event_list):
689        # Events are processed in the IocpProactor._poll() method
690        pass
691
692    def _stop_accept_futures(self):
693        for future in self._accept_futures.values():
694            future.cancel()
695        self._accept_futures.clear()
696
697    def _stop_serving(self, sock):
698        future = self._accept_futures.pop(sock.fileno(), None)
699        if future:
700            future.cancel()
701        self._proactor._stop_serving(sock)
702        sock.close()
703