1import sys 2from asyncio import transports 3from typing import Optional, Tuple, Union 4 5class BaseProtocol: 6 def connection_made(self, transport: transports.BaseTransport) -> None: ... 7 def connection_lost(self, exc: Optional[Exception]) -> None: ... 8 def pause_writing(self) -> None: ... 9 def resume_writing(self) -> None: ... 10 11class Protocol(BaseProtocol): 12 def data_received(self, data: bytes) -> None: ... 13 def eof_received(self) -> Optional[bool]: ... 14 15if sys.version_info >= (3, 7): 16 class BufferedProtocol(BaseProtocol): 17 def get_buffer(self, sizehint: int) -> bytearray: ... 18 def buffer_updated(self, nbytes: int) -> None: ... 19 20class DatagramProtocol(BaseProtocol): 21 def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: ... 22 def error_received(self, exc: Exception) -> None: ... 23 24class SubprocessProtocol(BaseProtocol): 25 def pipe_data_received(self, fd: int, data: bytes) -> None: ... 26 def pipe_connection_lost(self, fd: int, exc: Optional[Exception]) -> None: ... 27 def process_exited(self) -> None: ... 28