1from __future__ import annotations 2 3from multiprocessing.synchronize import Event as EventType 4from typing import Any, Awaitable, Callable, Dict, Iterable, Optional, Tuple, Type, Union 5 6import h2.events 7import h11 8 9# Till PEP 544 is accepted 10try: 11 from typing import Literal, Protocol, TypedDict 12except ImportError: 13 from typing_extensions import Literal, Protocol, TypedDict # type: ignore 14 15from .config import Config, Sockets 16 17H11SendableEvent = Union[h11.Data, h11.EndOfMessage, h11.InformationalResponse, h11.Response] 18 19WorkerFunc = Callable[[Config, Optional[Sockets], Optional[EventType]], None] 20 21 22class ASGIVersions(TypedDict, total=False): 23 spec_version: str 24 version: Union[Literal["2.0"], Literal["3.0"]] 25 26 27class HTTPScope(TypedDict): 28 type: Literal["http"] 29 asgi: ASGIVersions 30 http_version: str 31 method: str 32 scheme: str 33 path: str 34 raw_path: bytes 35 query_string: bytes 36 root_path: str 37 headers: Iterable[Tuple[bytes, bytes]] 38 client: Optional[Tuple[str, int]] 39 server: Optional[Tuple[str, Optional[int]]] 40 extensions: Dict[str, dict] 41 42 43class WebsocketScope(TypedDict): 44 type: Literal["websocket"] 45 asgi: ASGIVersions 46 http_version: str 47 scheme: str 48 path: str 49 raw_path: bytes 50 query_string: bytes 51 root_path: str 52 headers: Iterable[Tuple[bytes, bytes]] 53 client: Optional[Tuple[str, int]] 54 server: Optional[Tuple[str, Optional[int]]] 55 subprotocols: Iterable[str] 56 extensions: Dict[str, dict] 57 58 59class LifespanScope(TypedDict): 60 type: Literal["lifespan"] 61 asgi: ASGIVersions 62 63 64WWWScope = Union[HTTPScope, WebsocketScope] 65Scope = Union[HTTPScope, WebsocketScope, LifespanScope] 66 67 68class HTTPRequestEvent(TypedDict): 69 type: Literal["http.request"] 70 body: bytes 71 more_body: bool 72 73 74class HTTPResponseStartEvent(TypedDict): 75 type: Literal["http.response.start"] 76 status: int 77 headers: Iterable[Tuple[bytes, bytes]] 78 79 80class HTTPResponseBodyEvent(TypedDict): 81 type: Literal["http.response.body"] 82 body: bytes 83 more_body: bool 84 85 86class HTTPServerPushEvent(TypedDict): 87 type: Literal["http.response.push"] 88 path: str 89 headers: Iterable[Tuple[bytes, bytes]] 90 91 92class HTTPDisconnectEvent(TypedDict): 93 type: Literal["http.disconnect"] 94 95 96class WebsocketConnectEvent(TypedDict): 97 type: Literal["websocket.connect"] 98 99 100class WebsocketAcceptEvent(TypedDict): 101 type: Literal["websocket.accept"] 102 subprotocol: Optional[str] 103 headers: Iterable[Tuple[bytes, bytes]] 104 105 106class WebsocketReceiveEvent(TypedDict): 107 type: Literal["websocket.receive"] 108 bytes: Optional[bytes] 109 text: Optional[str] 110 111 112class WebsocketSendEvent(TypedDict): 113 type: Literal["websocket.send"] 114 bytes: Optional[bytes] 115 text: Optional[str] 116 117 118class WebsocketResponseStartEvent(TypedDict): 119 type: Literal["websocket.http.response.start"] 120 status: int 121 headers: Iterable[Tuple[bytes, bytes]] 122 123 124class WebsocketResponseBodyEvent(TypedDict): 125 type: Literal["websocket.http.response.body"] 126 body: bytes 127 more_body: bool 128 129 130class WebsocketDisconnectEvent(TypedDict): 131 type: Literal["websocket.disconnect"] 132 code: int 133 134 135class WebsocketCloseEvent(TypedDict): 136 type: Literal["websocket.close"] 137 code: int 138 139 140class LifespanStartupEvent(TypedDict): 141 type: Literal["lifespan.startup"] 142 143 144class LifespanShutdownEvent(TypedDict): 145 type: Literal["lifespan.shutdown"] 146 147 148class LifespanStartupCompleteEvent(TypedDict): 149 type: Literal["lifespan.startup.complete"] 150 151 152class LifespanStartupFailedEvent(TypedDict): 153 type: Literal["lifespan.startup.failed"] 154 message: str 155 156 157class LifespanShutdownCompleteEvent(TypedDict): 158 type: Literal["lifespan.shutdown.complete"] 159 160 161class LifespanShutdownFailedEvent(TypedDict): 162 type: Literal["lifespan.shutdown.failed"] 163 message: str 164 165 166ASGIReceiveEvent = Union[ 167 HTTPRequestEvent, 168 HTTPDisconnectEvent, 169 WebsocketConnectEvent, 170 WebsocketReceiveEvent, 171 WebsocketDisconnectEvent, 172 LifespanStartupEvent, 173 LifespanShutdownEvent, 174] 175 176 177ASGISendEvent = Union[ 178 HTTPResponseStartEvent, 179 HTTPResponseBodyEvent, 180 HTTPServerPushEvent, 181 HTTPDisconnectEvent, 182 WebsocketAcceptEvent, 183 WebsocketSendEvent, 184 WebsocketResponseStartEvent, 185 WebsocketResponseBodyEvent, 186 WebsocketCloseEvent, 187 LifespanStartupCompleteEvent, 188 LifespanStartupFailedEvent, 189 LifespanShutdownCompleteEvent, 190 LifespanShutdownFailedEvent, 191] 192 193 194ASGIReceiveCallable = Callable[[], Awaitable[ASGIReceiveEvent]] 195ASGISendCallable = Callable[[ASGISendEvent], Awaitable[None]] 196 197 198class ASGI2Protocol(Protocol): 199 # Should replace with a Protocol when PEP 544 is accepted. 200 201 def __init__(self, scope: Scope) -> None: 202 ... 203 204 async def __call__(self, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None: 205 ... 206 207 208ASGI2Framework = Type[ASGI2Protocol] 209ASGI3Framework = Callable[ 210 [ 211 Scope, 212 ASGIReceiveCallable, 213 ASGISendCallable, 214 ], 215 Awaitable[None], 216] 217ASGIFramework = Union[ASGI2Framework, ASGI3Framework] 218 219 220class H2SyncStream(Protocol): 221 scope: dict 222 223 def data_received(self, data: bytes) -> None: 224 ... 225 226 def ended(self) -> None: 227 ... 228 229 def reset(self) -> None: 230 ... 231 232 def close(self) -> None: 233 ... 234 235 async def handle_request( 236 self, 237 event: h2.events.RequestReceived, 238 scheme: str, 239 client: Tuple[str, int], 240 server: Tuple[str, int], 241 ) -> None: 242 ... 243 244 245class H2AsyncStream(Protocol): 246 scope: dict 247 248 async def data_received(self, data: bytes) -> None: 249 ... 250 251 async def ended(self) -> None: 252 ... 253 254 async def reset(self) -> None: 255 ... 256 257 async def close(self) -> None: 258 ... 259 260 async def handle_request( 261 self, 262 event: h2.events.RequestReceived, 263 scheme: str, 264 client: Tuple[str, int], 265 server: Tuple[str, int], 266 ) -> None: 267 ... 268 269 270class Event(Protocol): 271 def __init__(self) -> None: 272 ... 273 274 async def clear(self) -> None: 275 ... 276 277 async def set(self) -> None: 278 ... 279 280 async def wait(self) -> None: 281 ... 282 283 284class Context(Protocol): 285 event_class: Type[Event] 286 287 async def spawn_app( 288 self, 289 app: ASGIFramework, 290 config: Config, 291 scope: Scope, 292 send: Callable[[Optional[ASGISendEvent]], Awaitable[None]], 293 ) -> Callable[[ASGIReceiveEvent], Awaitable[None]]: 294 ... 295 296 def spawn(self, func: Callable, *args: Any) -> None: 297 ... 298 299 @staticmethod 300 async def sleep(wait: Union[float, int]) -> None: 301 ... 302 303 @staticmethod 304 def time() -> float: 305 ... 306 307 308class ResponseSummary(TypedDict): 309 status: int 310 headers: Iterable[Tuple[bytes, bytes]] 311