1"""Event loop using a proactor and related classes.
2
3A proactor is a "notify-on-completion" multiplexer.  Currently a
4proactor is only implemented on Windows with IOCP.
5"""
6
7__all__ = ['BaseProactorEventLoop']
8
9import socket
10import sys
11import warnings
12
13from . import base_events
14from . import constants
15from . import futures
16from . import sslproto
17from . import transports
18from .log import logger
19
20
21class _ProactorBasePipeTransport(transports._FlowControlMixin,
22                                 transports.BaseTransport):
23    """Base class for pipe and socket transports."""
24
25    def __init__(self, loop, sock, protocol, waiter=None,
26                 extra=None, server=None):
27        super().__init__(extra, loop)
28        self._set_extra(sock)
29        self._sock = sock
30        self._protocol = protocol
31        self._server = server
32        self._buffer = None  # None or bytearray.
33        self._read_fut = None
34        self._write_fut = None
35        self._pending_write = 0
36        self._conn_lost = 0
37        self._closing = False  # Set when close() called.
38        self._eof_written = False
39        if self._server is not None:
40            self._server._attach()
41        self._loop.call_soon(self._protocol.connection_made, self)
42        if waiter is not None:
43            # only wake up the waiter when connection_made() has been called
44            self._loop.call_soon(waiter._set_result_unless_cancelled, None)
45
46    def __repr__(self):
47        info = [self.__class__.__name__]
48        if self._sock is None:
49            info.append('closed')
50        elif self._closing:
51            info.append('closing')
52        if self._sock is not None:
53            info.append('fd=%s' % self._sock.fileno())
54        if self._read_fut is not None:
55            info.append('read=%s' % self._read_fut)
56        if self._write_fut is not None:
57            info.append("write=%r" % self._write_fut)
58        if self._buffer:
59            bufsize = len(self._buffer)
60            info.append('write_bufsize=%s' % bufsize)
61        if self._eof_written:
62            info.append('EOF written')
63        return '<%s>' % ' '.join(info)
64
65    def _set_extra(self, sock):
66        self._extra['pipe'] = sock
67
68    def close(self):
69        if self._closing:
70            return
71        self._closing = True
72        self._conn_lost += 1
73        if not self._buffer and self._write_fut is None:
74            self._loop.call_soon(self._call_connection_lost, None)
75        if self._read_fut is not None:
76            self._read_fut.cancel()
77            self._read_fut = None
78
79    # On Python 3.3 and older, objects with a destructor part of a reference
80    # cycle are never destroyed. It's not more the case on Python 3.4 thanks
81    # to the PEP 442.
82    if sys.version_info >= (3, 4):
83        def __del__(self):
84            if self._sock is not None:
85                warnings.warn("unclosed transport %r" % self, ResourceWarning)
86                self.close()
87
88    def _fatal_error(self, exc, message='Fatal error on pipe transport'):
89        if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
90            if self._loop.get_debug():
91                logger.debug("%r: %s", self, message, exc_info=True)
92        else:
93            self._loop.call_exception_handler({
94                'message': message,
95                'exception': exc,
96                'transport': self,
97                'protocol': self._protocol,
98            })
99        self._force_close(exc)
100
101    def _force_close(self, exc):
102        if self._closing:
103            return
104        self._closing = True
105        self._conn_lost += 1
106        if self._write_fut:
107            self._write_fut.cancel()
108            self._write_fut = None
109        if self._read_fut:
110            self._read_fut.cancel()
111            self._read_fut = None
112        self._pending_write = 0
113        self._buffer = None
114        self._loop.call_soon(self._call_connection_lost, exc)
115
116    def _call_connection_lost(self, exc):
117        try:
118            self._protocol.connection_lost(exc)
119        finally:
120            # XXX If there is a pending overlapped read on the other
121            # end then it may fail with ERROR_NETNAME_DELETED if we
122            # just close our end.  First calling shutdown() seems to
123            # cure it, but maybe using DisconnectEx() would be better.
124            if hasattr(self._sock, 'shutdown'):
125                self._sock.shutdown(socket.SHUT_RDWR)
126            self._sock.close()
127            self._sock = None
128            server = self._server
129            if server is not None:
130                server._detach()
131                self._server = None
132
133    def get_write_buffer_size(self):
134        size = self._pending_write
135        if self._buffer is not None:
136            size += len(self._buffer)
137        return size
138
139
140class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
141                                 transports.ReadTransport):
142    """Transport for read pipes."""
143
144    def __init__(self, loop, sock, protocol, waiter=None,
145                 extra=None, server=None):
146        super().__init__(loop, sock, protocol, waiter, extra, server)
147        self._paused = False
148        self._loop.call_soon(self._loop_reading)
149
150    def pause_reading(self):
151        if self._closing:
152            raise RuntimeError('Cannot pause_reading() when closing')
153        if self._paused:
154            raise RuntimeError('Already paused')
155        self._paused = True
156        if self._loop.get_debug():
157            logger.debug("%r pauses reading", self)
158
159    def resume_reading(self):
160        if not self._paused:
161            raise RuntimeError('Not paused')
162        self._paused = False
163        if self._closing:
164            return
165        self._loop.call_soon(self._loop_reading, self._read_fut)
166        if self._loop.get_debug():
167            logger.debug("%r resumes reading", self)
168
169    def _loop_reading(self, fut=None):
170        if self._paused:
171            return
172        data = None
173
174        try:
175            if fut is not None:
176                assert self._read_fut is fut or (self._read_fut is None and
177                                                 self._closing)
178                self._read_fut = None
179                data = fut.result()  # deliver data later in "finally" clause
180
181            if self._closing:
182                # since close() has been called we ignore any read data
183                data = None
184                return
185
186            if data == b'':
187                # we got end-of-file so no need to reschedule a new read
188                return
189
190            # reschedule a new read
191            self._read_fut = self._loop._proactor.recv(self._sock, 4096)
192        except ConnectionAbortedError as exc:
193            if not self._closing:
194                self._fatal_error(exc, 'Fatal read error on pipe transport')
195            elif self._loop.get_debug():
196                logger.debug("Read error on pipe transport while closing",
197                             exc_info=True)
198        except ConnectionResetError as exc:
199            self._force_close(exc)
200        except OSError as exc:
201            self._fatal_error(exc, 'Fatal read error on pipe transport')
202        except futures.CancelledError:
203            if not self._closing:
204                raise
205        else:
206            self._read_fut.add_done_callback(self._loop_reading)
207        finally:
208            if data:
209                self._protocol.data_received(data)
210            elif data is not None:
211                if self._loop.get_debug():
212                    logger.debug("%r received EOF", self)
213                keep_open = self._protocol.eof_received()
214                if not keep_open:
215                    self.close()
216
217
218class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
219                                      transports.WriteTransport):
220    """Transport for write pipes."""
221
222    def write(self, data):
223        if not isinstance(data, (bytes, bytearray, memoryview)):
224            raise TypeError('data argument must be byte-ish (%r)',
225                            type(data))
226        if self._eof_written:
227            raise RuntimeError('write_eof() already called')
228
229        if not data:
230            return
231
232        if self._conn_lost:
233            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
234                logger.warning('socket.send() raised exception.')
235            self._conn_lost += 1
236            return
237
238        # Observable states:
239        # 1. IDLE: _write_fut and _buffer both None
240        # 2. WRITING: _write_fut set; _buffer None
241        # 3. BACKED UP: _write_fut set; _buffer a bytearray
242        # We always copy the data, so the caller can't modify it
243        # while we're still waiting for the I/O to happen.
244        if self._write_fut is None:  # IDLE -> WRITING
245            assert self._buffer is None
246            # Pass a copy, except if it's already immutable.
247            self._loop_writing(data=bytes(data))
248        elif not self._buffer:  # WRITING -> BACKED UP
249            # Make a mutable copy which we can extend.
250            self._buffer = bytearray(data)
251            self._maybe_pause_protocol()
252        else:  # BACKED UP
253            # Append to buffer (also copies).
254            self._buffer.extend(data)
255            self._maybe_pause_protocol()
256
257    def _loop_writing(self, f=None, data=None):
258        try:
259            assert f is self._write_fut
260            self._write_fut = None
261            self._pending_write = 0
262            if f:
263                f.result()
264            if data is None:
265                data = self._buffer
266                self._buffer = None
267            if not data:
268                if self._closing:
269                    self._loop.call_soon(self._call_connection_lost, None)
270                if self._eof_written:
271                    self._sock.shutdown(socket.SHUT_WR)
272                # Now that we've reduced the buffer size, tell the
273                # protocol to resume writing if it was paused.  Note that
274                # we do this last since the callback is called immediately
275                # and it may add more data to the buffer (even causing the
276                # protocol to be paused again).
277                self._maybe_resume_protocol()
278            else:
279                self._write_fut = self._loop._proactor.send(self._sock, data)
280                if not self._write_fut.done():
281                    assert self._pending_write == 0
282                    self._pending_write = len(data)
283                    self._write_fut.add_done_callback(self._loop_writing)
284                    self._maybe_pause_protocol()
285                else:
286                    self._write_fut.add_done_callback(self._loop_writing)
287        except ConnectionResetError as exc:
288            self._force_close(exc)
289        except OSError as exc:
290            self._fatal_error(exc, 'Fatal write error on pipe transport')
291
292    def can_write_eof(self):
293        return True
294
295    def write_eof(self):
296        self.close()
297
298    def abort(self):
299        self._force_close(None)
300
301
302class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
303    def __init__(self, *args, **kw):
304        super().__init__(*args, **kw)
305        self._read_fut = self._loop._proactor.recv(self._sock, 16)
306        self._read_fut.add_done_callback(self._pipe_closed)
307
308    def _pipe_closed(self, fut):
309        if fut.cancelled():
310            # the transport has been closed
311            return
312        assert fut.result() == b''
313        if self._closing:
314            assert self._read_fut is None
315            return
316        assert fut is self._read_fut, (fut, self._read_fut)
317        self._read_fut = None
318        if self._write_fut is not None:
319            self._force_close(BrokenPipeError())
320        else:
321            self.close()
322
323
324class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
325                                   _ProactorBaseWritePipeTransport,
326                                   transports.Transport):
327    """Transport for duplex pipes."""
328
329    def can_write_eof(self):
330        return False
331
332    def write_eof(self):
333        raise NotImplementedError
334
335
336class _ProactorSocketTransport(_ProactorReadPipeTransport,
337                               _ProactorBaseWritePipeTransport,
338                               transports.Transport):
339    """Transport for connected sockets."""
340
341    def _set_extra(self, sock):
342        self._extra['socket'] = sock
343        try:
344            self._extra['sockname'] = sock.getsockname()
345        except (socket.error, AttributeError):
346            if self._loop.get_debug():
347                logger.warning("getsockname() failed on %r",
348                             sock, exc_info=True)
349        if 'peername' not in self._extra:
350            try:
351                self._extra['peername'] = sock.getpeername()
352            except (socket.error, AttributeError):
353                if self._loop.get_debug():
354                    logger.warning("getpeername() failed on %r",
355                                   sock, exc_info=True)
356
357    def can_write_eof(self):
358        return True
359
360    def write_eof(self):
361        if self._closing or self._eof_written:
362            return
363        self._eof_written = True
364        if self._write_fut is None:
365            self._sock.shutdown(socket.SHUT_WR)
366
367
368class BaseProactorEventLoop(base_events.BaseEventLoop):
369
370    def __init__(self, proactor):
371        super().__init__()
372        logger.debug('Using proactor: %s', proactor.__class__.__name__)
373        self._proactor = proactor
374        self._selector = proactor   # convenient alias
375        self._self_reading_future = None
376        self._accept_futures = {}   # socket file descriptor => Future
377        proactor.set_loop(self)
378        self._make_self_pipe()
379
380    def _make_socket_transport(self, sock, protocol, waiter=None,
381                               extra=None, server=None):
382        return _ProactorSocketTransport(self, sock, protocol, waiter,
383                                        extra, server)
384
385    def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
386                            *, server_side=False, server_hostname=None,
387                            extra=None, server=None):
388        if not sslproto._is_sslproto_available():
389            raise NotImplementedError("Proactor event loop requires Python 3.5"
390                                      " or newer (ssl.MemoryBIO) to support "
391                                      "SSL")
392
393        ssl_protocol = sslproto.SSLProtocol(self, protocol, sslcontext, waiter,
394                                            server_side, server_hostname)
395        _ProactorSocketTransport(self, rawsock, ssl_protocol,
396                                 extra=extra, server=server)
397        return ssl_protocol._app_transport
398
399    def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
400                                    extra=None):
401        return _ProactorDuplexPipeTransport(self,
402                                            sock, protocol, waiter, extra)
403
404    def _make_read_pipe_transport(self, sock, protocol, waiter=None,
405                                  extra=None):
406        return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
407
408    def _make_write_pipe_transport(self, sock, protocol, waiter=None,
409                                   extra=None):
410        # We want connection_lost() to be called when other end closes
411        return _ProactorWritePipeTransport(self,
412                                           sock, protocol, waiter, extra)
413
414    def close(self):
415        if self.is_running():
416            raise RuntimeError("Cannot close a running event loop")
417        if self.is_closed():
418            return
419
420        # Call these methods before closing the event loop (before calling
421        # BaseEventLoop.close), because they can schedule callbacks with
422        # call_soon(), which is forbidden when the event loop is closed.
423        self._stop_accept_futures()
424        self._close_self_pipe()
425        self._proactor.close()
426        self._proactor = None
427        self._selector = None
428
429        # Close the event loop
430        super().close()
431
432    def sock_recv(self, sock, n):
433        return self._proactor.recv(sock, n)
434
435    def sock_sendall(self, sock, data):
436        return self._proactor.send(sock, data)
437
438    def sock_connect(self, sock, address):
439        try:
440            if self._debug:
441                base_events._check_resolved_address(sock, address)
442        except ValueError as err:
443            fut = futures.Future(loop=self)
444            fut.set_exception(err)
445            return fut
446        else:
447            return self._proactor.connect(sock, address)
448
449    def sock_accept(self, sock):
450        return self._proactor.accept(sock)
451
452    def _socketpair(self):
453        raise NotImplementedError
454
455    def _close_self_pipe(self):
456        if self._self_reading_future is not None:
457            self._self_reading_future.cancel()
458            self._self_reading_future = None
459        self._ssock.close()
460        self._ssock = None
461        self._csock.close()
462        self._csock = None
463        self._internal_fds -= 1
464
465    def _make_self_pipe(self):
466        # A self-socket, really. :-)
467        self._ssock, self._csock = self._socketpair()
468        self._ssock.setblocking(False)
469        self._csock.setblocking(False)
470        self._internal_fds += 1
471        self.call_soon(self._loop_self_reading)
472
473    def _loop_self_reading(self, f=None):
474        try:
475            if f is not None:
476                f.result()  # may raise
477            f = self._proactor.recv(self._ssock, 4096)
478        except futures.CancelledError:
479            # _close_self_pipe() has been called, stop waiting for data
480            return
481        except Exception as exc:
482            self.call_exception_handler({
483                'message': 'Error on reading from the event loop self pipe',
484                'exception': exc,
485                'loop': self,
486            })
487        else:
488            self._self_reading_future = f
489            f.add_done_callback(self._loop_self_reading)
490
491    def _write_to_self(self):
492        self._csock.send(b'\0')
493
494    def _start_serving(self, protocol_factory, sock,
495                       sslcontext=None, server=None):
496
497        def loop(f=None):
498            try:
499                if f is not None:
500                    conn, addr = f.result()
501                    if self._debug:
502                        logger.debug("%r got a new connection from %r: %r",
503                                     server, addr, conn)
504                    protocol = protocol_factory()
505                    if sslcontext is not None:
506                        self._make_ssl_transport(
507                            conn, protocol, sslcontext, server_side=True,
508                            extra={'peername': addr}, server=server)
509                    else:
510                        self._make_socket_transport(
511                            conn, protocol,
512                            extra={'peername': addr}, server=server)
513                if self.is_closed():
514                    return
515                f = self._proactor.accept(sock)
516            except OSError as exc:
517                if sock.fileno() != -1:
518                    self.call_exception_handler({
519                        'message': 'Accept failed on a socket',
520                        'exception': exc,
521                        'socket': sock,
522                    })
523                    sock.close()
524                elif self._debug:
525                    logger.debug("Accept failed on socket %r",
526                                 sock, exc_info=True)
527            except futures.CancelledError:
528                sock.close()
529            else:
530                self._accept_futures[sock.fileno()] = f
531                f.add_done_callback(loop)
532
533        self.call_soon(loop)
534
535    def _process_events(self, event_list):
536        # Events are processed in the IocpProactor._poll() method
537        pass
538
539    def _stop_accept_futures(self):
540        for future in self._accept_futures.values():
541            future.cancel()
542        self._accept_futures.clear()
543
544    def _stop_serving(self, sock):
545        self._stop_accept_futures()
546        self._proactor._stop_serving(sock)
547        sock.close()
548