1from typing import Tuple, Union, Optional, Any, Dict, overload
2
3import os
4import select
5import sys
6import time
7import warnings
8from socket import SocketType
9from typing import Optional
10
11from errno import (EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL,
12                   ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED,
13                   EPIPE, EAGAIN, errorcode)
14
15# cyclic dependence with asynchat
16_maptype = Dict[int, Any]
17
18socket_map: _maptype = ...  # Undocumented
19
20class ExitNow(Exception): ...
21
22def read(obj: Any) -> None: ...
23def write(obj: Any) -> None: ...
24def readwrite(obj: Any, flags: int) -> None: ...
25def poll(timeout: float = ..., map: _maptype = ...) -> None: ...
26def poll2(timeout: float = ..., map: _maptype = ...) -> None: ...
27
28poll3 = poll2
29
30def loop(timeout: float = ..., use_poll: bool = ..., map: _maptype = ..., count: Optional[int] = ...) -> None: ...
31
32
33# Not really subclass of socket.socket; it's only delegation.
34# It is not covariant to it.
35class dispatcher:
36
37    debug: bool
38    connected: bool
39    accepting: bool
40    connecting: bool
41    closing: bool
42    ignore_log_types: frozenset[str]
43    socket: Optional[SocketType]
44
45    def __init__(self, sock: Optional[SocketType] = ..., map: _maptype = ...) -> None: ...
46    def add_channel(self, map: _maptype = ...) -> None: ...
47    def del_channel(self, map: _maptype = ...) -> None: ...
48    def create_socket(self, family: int, type: int) -> None: ...
49    def set_socket(self, sock: SocketType, map: _maptype = ...) -> None: ...
50    def set_reuse_addr(self) -> None: ...
51    def readable(self) -> bool: ...
52    def writable(self) -> bool: ...
53    def listen(self, backlog: int) -> None: ...
54    def bind(self, address: Union[Tuple[Any, ...], str]) -> None: ...
55    def connect(self, address: Union[Tuple[Any, ...], str]) -> None: ...
56    def accept(self) -> Optional[Tuple[SocketType, Any]]: ...
57    def send(self, data: bytes) -> int: ...
58    def recv(self, buffer_size: int) -> bytes: ...
59    def close(self) -> None: ...
60
61    def log(self, message: Any) -> None: ...
62    def log_info(self, message: Any, type: str = ...) -> None: ...
63    def handle_read_event(self) -> None: ...
64    def handle_connect_event(self) -> None: ...
65    def handle_write_event(self) -> None: ...
66    def handle_expt_event(self) -> None: ...
67    def handle_error(self) -> None: ...
68    def handle_expt(self) -> None: ...
69    def handle_read(self) -> None: ...
70    def handle_write(self) -> None: ...
71    def handle_connect(self) -> None: ...
72    def handle_accept(self) -> None: ...
73    def handle_close(self) -> None: ...
74
75    if sys.version_info < (3, 5):
76        # Historically, some methods were "imported" from `self.socket` by
77        # means of `__getattr__`. This was long deprecated, and as of Python
78        # 3.5 has been removed; simply call the relevant methods directly on
79        # self.socket if necessary.
80
81        def detach(self) -> int: ...
82        def fileno(self) -> int: ...
83
84        # return value is an address
85        def getpeername(self) -> Any: ...
86        def getsockname(self) -> Any: ...
87
88        @overload
89        def getsockopt(self, level: int, optname: int) -> int: ...
90        @overload
91        def getsockopt(self, level: int, optname: int, buflen: int) -> bytes: ...
92
93        def gettimeout(self) -> float: ...
94        def ioctl(self, control: object,
95                  option: Tuple[int, int, int]) -> None: ...
96        # TODO the return value may be BinaryIO or TextIO, depending on mode
97        def makefile(self, mode: str = ..., buffering: int = ...,
98                     encoding: str = ..., errors: str = ...,
99                     newline: str = ...) -> Any:
100            ...
101
102        # return type is an address
103        def recvfrom(self, bufsize: int, flags: int = ...) -> Any: ...
104        def recvfrom_into(self, buffer: bytes, nbytes: int, flags: int = ...) -> Any: ...
105        def recv_into(self, buffer: bytes, nbytes: int, flags: int = ...) -> Any: ...
106        def sendall(self, data: bytes, flags: int = ...) -> None: ...
107        def sendto(self, data: bytes, address: Union[Tuple[str, int], str], flags: int = ...) -> int: ...
108        def setblocking(self, flag: bool) -> None: ...
109        def settimeout(self, value: Union[float, None]) -> None: ...
110        def setsockopt(self, level: int, optname: int, value: Union[int, bytes]) -> None: ...
111        def shutdown(self, how: int) -> None: ...
112
113class dispatcher_with_send(dispatcher):
114    def __init__(self, sock: SocketType = ..., map: _maptype = ...) -> None: ...
115    def initiate_send(self) -> None: ...
116    def handle_write(self) -> None: ...
117    # incompatible signature:
118    # def send(self, data: bytes) -> Optional[int]: ...
119
120def compact_traceback() -> Tuple[Tuple[str, str, str], type, type, str]: ...
121def close_all(map: _maptype = ..., ignore_all: bool = ...) -> None: ...
122
123# if os.name == 'posix':
124#    import fcntl
125class file_wrapper:
126    fd: int
127
128    def __init__(self, fd: int) -> None: ...
129    def recv(self, bufsize: int, flags: int = ...) -> bytes: ...
130    def send(self, data: bytes, flags: int = ...) -> int: ...
131
132    @overload
133    def getsockopt(self, level: int, optname: int) -> int: ...
134    @overload
135    def getsockopt(self, level: int, optname: int, buflen: int) -> bytes: ...
136
137    def read(self, bufsize: int, flags: int = ...) -> bytes: ...
138    def write(self, data: bytes, flags: int = ...) -> int: ...
139
140    def close(self) -> None: ...
141    def fileno(self) -> int: ...
142
143class file_dispatcher(dispatcher):
144    def __init__(self, fd: int, map: _maptype = ...) -> None: ...
145    def set_file(self, fd: int) -> None: ...
146