1from typing import Tuple, Union, Optional, Any, Dict, overload 2 3import os 4import select 5import sys 6import time 7import warnings 8from socket import SocketType 9from typing import Optional 10 11from errno import (EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, 12 ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, 13 EPIPE, EAGAIN, errorcode) 14 15# cyclic dependence with asynchat 16_maptype = Dict[int, Any] 17 18socket_map: _maptype = ... # Undocumented 19 20class ExitNow(Exception): ... 21 22def read(obj: Any) -> None: ... 23def write(obj: Any) -> None: ... 24def readwrite(obj: Any, flags: int) -> None: ... 25def poll(timeout: float = ..., map: _maptype = ...) -> None: ... 26def poll2(timeout: float = ..., map: _maptype = ...) -> None: ... 27 28poll3 = poll2 29 30def loop(timeout: float = ..., use_poll: bool = ..., map: _maptype = ..., count: Optional[int] = ...) -> None: ... 31 32 33# Not really subclass of socket.socket; it's only delegation. 34# It is not covariant to it. 35class dispatcher: 36 37 debug: bool 38 connected: bool 39 accepting: bool 40 connecting: bool 41 closing: bool 42 ignore_log_types: frozenset[str] 43 socket: Optional[SocketType] 44 45 def __init__(self, sock: Optional[SocketType] = ..., map: _maptype = ...) -> None: ... 46 def add_channel(self, map: _maptype = ...) -> None: ... 47 def del_channel(self, map: _maptype = ...) -> None: ... 48 def create_socket(self, family: int, type: int) -> None: ... 49 def set_socket(self, sock: SocketType, map: _maptype = ...) -> None: ... 50 def set_reuse_addr(self) -> None: ... 51 def readable(self) -> bool: ... 52 def writable(self) -> bool: ... 53 def listen(self, backlog: int) -> None: ... 54 def bind(self, address: Union[Tuple[Any, ...], str]) -> None: ... 55 def connect(self, address: Union[Tuple[Any, ...], str]) -> None: ... 56 def accept(self) -> Optional[Tuple[SocketType, Any]]: ... 57 def send(self, data: bytes) -> int: ... 58 def recv(self, buffer_size: int) -> bytes: ... 59 def close(self) -> None: ... 60 61 def log(self, message: Any) -> None: ... 62 def log_info(self, message: Any, type: str = ...) -> None: ... 63 def handle_read_event(self) -> None: ... 64 def handle_connect_event(self) -> None: ... 65 def handle_write_event(self) -> None: ... 66 def handle_expt_event(self) -> None: ... 67 def handle_error(self) -> None: ... 68 def handle_expt(self) -> None: ... 69 def handle_read(self) -> None: ... 70 def handle_write(self) -> None: ... 71 def handle_connect(self) -> None: ... 72 def handle_accept(self) -> None: ... 73 def handle_close(self) -> None: ... 74 75 if sys.version_info < (3, 5): 76 # Historically, some methods were "imported" from `self.socket` by 77 # means of `__getattr__`. This was long deprecated, and as of Python 78 # 3.5 has been removed; simply call the relevant methods directly on 79 # self.socket if necessary. 80 81 def detach(self) -> int: ... 82 def fileno(self) -> int: ... 83 84 # return value is an address 85 def getpeername(self) -> Any: ... 86 def getsockname(self) -> Any: ... 87 88 @overload 89 def getsockopt(self, level: int, optname: int) -> int: ... 90 @overload 91 def getsockopt(self, level: int, optname: int, buflen: int) -> bytes: ... 92 93 def gettimeout(self) -> float: ... 94 def ioctl(self, control: object, 95 option: Tuple[int, int, int]) -> None: ... 96 # TODO the return value may be BinaryIO or TextIO, depending on mode 97 def makefile(self, mode: str = ..., buffering: int = ..., 98 encoding: str = ..., errors: str = ..., 99 newline: str = ...) -> Any: 100 ... 101 102 # return type is an address 103 def recvfrom(self, bufsize: int, flags: int = ...) -> Any: ... 104 def recvfrom_into(self, buffer: bytes, nbytes: int, flags: int = ...) -> Any: ... 105 def recv_into(self, buffer: bytes, nbytes: int, flags: int = ...) -> Any: ... 106 def sendall(self, data: bytes, flags: int = ...) -> None: ... 107 def sendto(self, data: bytes, address: Union[Tuple[str, int], str], flags: int = ...) -> int: ... 108 def setblocking(self, flag: bool) -> None: ... 109 def settimeout(self, value: Union[float, None]) -> None: ... 110 def setsockopt(self, level: int, optname: int, value: Union[int, bytes]) -> None: ... 111 def shutdown(self, how: int) -> None: ... 112 113class dispatcher_with_send(dispatcher): 114 def __init__(self, sock: SocketType = ..., map: _maptype = ...) -> None: ... 115 def initiate_send(self) -> None: ... 116 def handle_write(self) -> None: ... 117 # incompatible signature: 118 # def send(self, data: bytes) -> Optional[int]: ... 119 120def compact_traceback() -> Tuple[Tuple[str, str, str], type, type, str]: ... 121def close_all(map: _maptype = ..., ignore_all: bool = ...) -> None: ... 122 123# if os.name == 'posix': 124# import fcntl 125class file_wrapper: 126 fd: int 127 128 def __init__(self, fd: int) -> None: ... 129 def recv(self, bufsize: int, flags: int = ...) -> bytes: ... 130 def send(self, data: bytes, flags: int = ...) -> int: ... 131 132 @overload 133 def getsockopt(self, level: int, optname: int) -> int: ... 134 @overload 135 def getsockopt(self, level: int, optname: int, buflen: int) -> bytes: ... 136 137 def read(self, bufsize: int, flags: int = ...) -> bytes: ... 138 def write(self, data: bytes, flags: int = ...) -> int: ... 139 140 def close(self) -> None: ... 141 def fileno(self) -> int: ... 142 143class file_dispatcher(dispatcher): 144 def __init__(self, fd: int, map: _maptype = ...) -> None: ... 145 def set_file(self, fd: int) -> None: ... 146