1import sys
2from asyncio import transports
3from typing import Optional, Tuple, Union
4
5class BaseProtocol:
6    def connection_made(self, transport: transports.BaseTransport) -> None: ...
7    def connection_lost(self, exc: Optional[Exception]) -> None: ...
8    def pause_writing(self) -> None: ...
9    def resume_writing(self) -> None: ...
10
11class Protocol(BaseProtocol):
12    def data_received(self, data: bytes) -> None: ...
13    def eof_received(self) -> Optional[bool]: ...
14
15if sys.version_info >= (3, 7):
16    class BufferedProtocol(BaseProtocol):
17        def get_buffer(self, sizehint: int) -> bytearray: ...
18        def buffer_updated(self, nbytes: int) -> None: ...
19
20class DatagramProtocol(BaseProtocol):
21    def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: ...
22    def error_received(self, exc: Exception) -> None: ...
23
24class SubprocessProtocol(BaseProtocol):
25    def pipe_data_received(self, fd: int, data: bytes) -> None: ...
26    def pipe_connection_lost(self, fd: int, exc: Optional[Exception]) -> None: ...
27    def process_exited(self) -> None: ...
28