1from __future__ import annotations
2
3from multiprocessing.synchronize import Event as EventType
4from typing import Any, Awaitable, Callable, Dict, Iterable, Optional, Tuple, Type, Union
5
6import h2.events
7import h11
8
9# Till PEP 544 is accepted
10try:
11    from typing import Literal, Protocol, TypedDict
12except ImportError:
13    from typing_extensions import Literal, Protocol, TypedDict  # type: ignore
14
15from .config import Config, Sockets
16
17H11SendableEvent = Union[h11.Data, h11.EndOfMessage, h11.InformationalResponse, h11.Response]
18
19WorkerFunc = Callable[[Config, Optional[Sockets], Optional[EventType]], None]
20
21
22class ASGIVersions(TypedDict, total=False):
23    spec_version: str
24    version: Union[Literal["2.0"], Literal["3.0"]]
25
26
27class HTTPScope(TypedDict):
28    type: Literal["http"]
29    asgi: ASGIVersions
30    http_version: str
31    method: str
32    scheme: str
33    path: str
34    raw_path: bytes
35    query_string: bytes
36    root_path: str
37    headers: Iterable[Tuple[bytes, bytes]]
38    client: Optional[Tuple[str, int]]
39    server: Optional[Tuple[str, Optional[int]]]
40    extensions: Dict[str, dict]
41
42
43class WebsocketScope(TypedDict):
44    type: Literal["websocket"]
45    asgi: ASGIVersions
46    http_version: str
47    scheme: str
48    path: str
49    raw_path: bytes
50    query_string: bytes
51    root_path: str
52    headers: Iterable[Tuple[bytes, bytes]]
53    client: Optional[Tuple[str, int]]
54    server: Optional[Tuple[str, Optional[int]]]
55    subprotocols: Iterable[str]
56    extensions: Dict[str, dict]
57
58
59class LifespanScope(TypedDict):
60    type: Literal["lifespan"]
61    asgi: ASGIVersions
62
63
64WWWScope = Union[HTTPScope, WebsocketScope]
65Scope = Union[HTTPScope, WebsocketScope, LifespanScope]
66
67
68class HTTPRequestEvent(TypedDict):
69    type: Literal["http.request"]
70    body: bytes
71    more_body: bool
72
73
74class HTTPResponseStartEvent(TypedDict):
75    type: Literal["http.response.start"]
76    status: int
77    headers: Iterable[Tuple[bytes, bytes]]
78
79
80class HTTPResponseBodyEvent(TypedDict):
81    type: Literal["http.response.body"]
82    body: bytes
83    more_body: bool
84
85
86class HTTPServerPushEvent(TypedDict):
87    type: Literal["http.response.push"]
88    path: str
89    headers: Iterable[Tuple[bytes, bytes]]
90
91
92class HTTPDisconnectEvent(TypedDict):
93    type: Literal["http.disconnect"]
94
95
96class WebsocketConnectEvent(TypedDict):
97    type: Literal["websocket.connect"]
98
99
100class WebsocketAcceptEvent(TypedDict):
101    type: Literal["websocket.accept"]
102    subprotocol: Optional[str]
103    headers: Iterable[Tuple[bytes, bytes]]
104
105
106class WebsocketReceiveEvent(TypedDict):
107    type: Literal["websocket.receive"]
108    bytes: Optional[bytes]
109    text: Optional[str]
110
111
112class WebsocketSendEvent(TypedDict):
113    type: Literal["websocket.send"]
114    bytes: Optional[bytes]
115    text: Optional[str]
116
117
118class WebsocketResponseStartEvent(TypedDict):
119    type: Literal["websocket.http.response.start"]
120    status: int
121    headers: Iterable[Tuple[bytes, bytes]]
122
123
124class WebsocketResponseBodyEvent(TypedDict):
125    type: Literal["websocket.http.response.body"]
126    body: bytes
127    more_body: bool
128
129
130class WebsocketDisconnectEvent(TypedDict):
131    type: Literal["websocket.disconnect"]
132    code: int
133
134
135class WebsocketCloseEvent(TypedDict):
136    type: Literal["websocket.close"]
137    code: int
138
139
140class LifespanStartupEvent(TypedDict):
141    type: Literal["lifespan.startup"]
142
143
144class LifespanShutdownEvent(TypedDict):
145    type: Literal["lifespan.shutdown"]
146
147
148class LifespanStartupCompleteEvent(TypedDict):
149    type: Literal["lifespan.startup.complete"]
150
151
152class LifespanStartupFailedEvent(TypedDict):
153    type: Literal["lifespan.startup.failed"]
154    message: str
155
156
157class LifespanShutdownCompleteEvent(TypedDict):
158    type: Literal["lifespan.shutdown.complete"]
159
160
161class LifespanShutdownFailedEvent(TypedDict):
162    type: Literal["lifespan.shutdown.failed"]
163    message: str
164
165
166ASGIReceiveEvent = Union[
167    HTTPRequestEvent,
168    HTTPDisconnectEvent,
169    WebsocketConnectEvent,
170    WebsocketReceiveEvent,
171    WebsocketDisconnectEvent,
172    LifespanStartupEvent,
173    LifespanShutdownEvent,
174]
175
176
177ASGISendEvent = Union[
178    HTTPResponseStartEvent,
179    HTTPResponseBodyEvent,
180    HTTPServerPushEvent,
181    HTTPDisconnectEvent,
182    WebsocketAcceptEvent,
183    WebsocketSendEvent,
184    WebsocketResponseStartEvent,
185    WebsocketResponseBodyEvent,
186    WebsocketCloseEvent,
187    LifespanStartupCompleteEvent,
188    LifespanStartupFailedEvent,
189    LifespanShutdownCompleteEvent,
190    LifespanShutdownFailedEvent,
191]
192
193
194ASGIReceiveCallable = Callable[[], Awaitable[ASGIReceiveEvent]]
195ASGISendCallable = Callable[[ASGISendEvent], Awaitable[None]]
196
197
198class ASGI2Protocol(Protocol):
199    # Should replace with a Protocol when PEP 544 is accepted.
200
201    def __init__(self, scope: Scope) -> None:
202        ...
203
204    async def __call__(self, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
205        ...
206
207
208ASGI2Framework = Type[ASGI2Protocol]
209ASGI3Framework = Callable[
210    [
211        Scope,
212        ASGIReceiveCallable,
213        ASGISendCallable,
214    ],
215    Awaitable[None],
216]
217ASGIFramework = Union[ASGI2Framework, ASGI3Framework]
218
219
220class H2SyncStream(Protocol):
221    scope: dict
222
223    def data_received(self, data: bytes) -> None:
224        ...
225
226    def ended(self) -> None:
227        ...
228
229    def reset(self) -> None:
230        ...
231
232    def close(self) -> None:
233        ...
234
235    async def handle_request(
236        self,
237        event: h2.events.RequestReceived,
238        scheme: str,
239        client: Tuple[str, int],
240        server: Tuple[str, int],
241    ) -> None:
242        ...
243
244
245class H2AsyncStream(Protocol):
246    scope: dict
247
248    async def data_received(self, data: bytes) -> None:
249        ...
250
251    async def ended(self) -> None:
252        ...
253
254    async def reset(self) -> None:
255        ...
256
257    async def close(self) -> None:
258        ...
259
260    async def handle_request(
261        self,
262        event: h2.events.RequestReceived,
263        scheme: str,
264        client: Tuple[str, int],
265        server: Tuple[str, int],
266    ) -> None:
267        ...
268
269
270class Event(Protocol):
271    def __init__(self) -> None:
272        ...
273
274    async def clear(self) -> None:
275        ...
276
277    async def set(self) -> None:
278        ...
279
280    async def wait(self) -> None:
281        ...
282
283
284class Context(Protocol):
285    event_class: Type[Event]
286
287    async def spawn_app(
288        self,
289        app: ASGIFramework,
290        config: Config,
291        scope: Scope,
292        send: Callable[[Optional[ASGISendEvent]], Awaitable[None]],
293    ) -> Callable[[ASGIReceiveEvent], Awaitable[None]]:
294        ...
295
296    def spawn(self, func: Callable, *args: Any) -> None:
297        ...
298
299    @staticmethod
300    async def sleep(wait: Union[float, int]) -> None:
301        ...
302
303    @staticmethod
304    def time() -> float:
305        ...
306
307
308class ResponseSummary(TypedDict):
309    status: int
310    headers: Iterable[Tuple[bytes, bytes]]
311