1"""HTTP Client for asyncio."""
2
3import asyncio
4import base64
5import hashlib
6import json
7import os
8import sys
9import traceback
10import warnings
11from types import SimpleNamespace, TracebackType
12from typing import (
13    Any,
14    Awaitable,
15    Callable,
16    Coroutine,
17    FrozenSet,
18    Generator,
19    Generic,
20    Iterable,
21    List,
22    Mapping,
23    Optional,
24    Set,
25    Tuple,
26    Type,
27    TypeVar,
28    Union,
29)
30
31import attr
32from multidict import CIMultiDict, MultiDict, MultiDictProxy, istr
33from yarl import URL
34
35from . import hdrs, http, payload
36from .abc import AbstractCookieJar
37from .client_exceptions import (
38    ClientConnectionError as ClientConnectionError,
39    ClientConnectorCertificateError as ClientConnectorCertificateError,
40    ClientConnectorError as ClientConnectorError,
41    ClientConnectorSSLError as ClientConnectorSSLError,
42    ClientError as ClientError,
43    ClientHttpProxyError as ClientHttpProxyError,
44    ClientOSError as ClientOSError,
45    ClientPayloadError as ClientPayloadError,
46    ClientProxyConnectionError as ClientProxyConnectionError,
47    ClientResponseError as ClientResponseError,
48    ClientSSLError as ClientSSLError,
49    ContentTypeError as ContentTypeError,
50    InvalidURL as InvalidURL,
51    ServerConnectionError as ServerConnectionError,
52    ServerDisconnectedError as ServerDisconnectedError,
53    ServerFingerprintMismatch as ServerFingerprintMismatch,
54    ServerTimeoutError as ServerTimeoutError,
55    TooManyRedirects as TooManyRedirects,
56    WSServerHandshakeError as WSServerHandshakeError,
57)
58from .client_reqrep import (
59    ClientRequest as ClientRequest,
60    ClientResponse as ClientResponse,
61    Fingerprint as Fingerprint,
62    RequestInfo as RequestInfo,
63    _merge_ssl_params,
64)
65from .client_ws import ClientWebSocketResponse as ClientWebSocketResponse
66from .connector import (
67    BaseConnector as BaseConnector,
68    NamedPipeConnector as NamedPipeConnector,
69    TCPConnector as TCPConnector,
70    UnixConnector as UnixConnector,
71)
72from .cookiejar import CookieJar
73from .helpers import (
74    DEBUG,
75    PY_36,
76    BasicAuth,
77    CeilTimeout,
78    TimeoutHandle,
79    get_running_loop,
80    proxies_from_env,
81    sentinel,
82    strip_auth_from_url,
83)
84from .http import WS_KEY, HttpVersion, WebSocketReader, WebSocketWriter
85from .http_websocket import WSHandshakeError, WSMessage, ws_ext_gen, ws_ext_parse
86from .streams import FlowControlDataQueue
87from .tracing import Trace, TraceConfig
88from .typedefs import JSONEncoder, LooseCookies, LooseHeaders, StrOrURL
89
90__all__ = (
91    # client_exceptions
92    "ClientConnectionError",
93    "ClientConnectorCertificateError",
94    "ClientConnectorError",
95    "ClientConnectorSSLError",
96    "ClientError",
97    "ClientHttpProxyError",
98    "ClientOSError",
99    "ClientPayloadError",
100    "ClientProxyConnectionError",
101    "ClientResponseError",
102    "ClientSSLError",
103    "ContentTypeError",
104    "InvalidURL",
105    "ServerConnectionError",
106    "ServerDisconnectedError",
107    "ServerFingerprintMismatch",
108    "ServerTimeoutError",
109    "TooManyRedirects",
110    "WSServerHandshakeError",
111    # client_reqrep
112    "ClientRequest",
113    "ClientResponse",
114    "Fingerprint",
115    "RequestInfo",
116    # connector
117    "BaseConnector",
118    "TCPConnector",
119    "UnixConnector",
120    "NamedPipeConnector",
121    # client_ws
122    "ClientWebSocketResponse",
123    # client
124    "ClientSession",
125    "ClientTimeout",
126    "request",
127)
128
129
130try:
131    from ssl import SSLContext
132except ImportError:  # pragma: no cover
133    SSLContext = object  # type: ignore
134
135
136@attr.s(auto_attribs=True, frozen=True, slots=True)
137class ClientTimeout:
138    total: Optional[float] = None
139    connect: Optional[float] = None
140    sock_read: Optional[float] = None
141    sock_connect: Optional[float] = None
142
143    # pool_queue_timeout: Optional[float] = None
144    # dns_resolution_timeout: Optional[float] = None
145    # socket_connect_timeout: Optional[float] = None
146    # connection_acquiring_timeout: Optional[float] = None
147    # new_connection_timeout: Optional[float] = None
148    # http_header_timeout: Optional[float] = None
149    # response_body_timeout: Optional[float] = None
150
151    # to create a timeout specific for a single request, either
152    # - create a completely new one to overwrite the default
153    # - or use http://www.attrs.org/en/stable/api.html#attr.evolve
154    # to overwrite the defaults
155
156
157# 5 Minute default read timeout
158DEFAULT_TIMEOUT = ClientTimeout(total=5 * 60)
159
160_RetType = TypeVar("_RetType")
161
162
163class ClientSession:
164    """First-class interface for making HTTP requests."""
165
166    ATTRS = frozenset(
167        [
168            "_source_traceback",
169            "_connector",
170            "requote_redirect_url",
171            "_loop",
172            "_cookie_jar",
173            "_connector_owner",
174            "_default_auth",
175            "_version",
176            "_json_serialize",
177            "_requote_redirect_url",
178            "_timeout",
179            "_raise_for_status",
180            "_auto_decompress",
181            "_trust_env",
182            "_default_headers",
183            "_skip_auto_headers",
184            "_request_class",
185            "_response_class",
186            "_ws_response_class",
187            "_trace_configs",
188            "_read_bufsize",
189        ]
190    )
191
192    _source_traceback = None
193
194    def __init__(
195        self,
196        *,
197        connector: Optional[BaseConnector] = None,
198        loop: Optional[asyncio.AbstractEventLoop] = None,
199        cookies: Optional[LooseCookies] = None,
200        headers: Optional[LooseHeaders] = None,
201        skip_auto_headers: Optional[Iterable[str]] = None,
202        auth: Optional[BasicAuth] = None,
203        json_serialize: JSONEncoder = json.dumps,
204        request_class: Type[ClientRequest] = ClientRequest,
205        response_class: Type[ClientResponse] = ClientResponse,
206        ws_response_class: Type[ClientWebSocketResponse] = ClientWebSocketResponse,
207        version: HttpVersion = http.HttpVersion11,
208        cookie_jar: Optional[AbstractCookieJar] = None,
209        connector_owner: bool = True,
210        raise_for_status: bool = False,
211        read_timeout: Union[float, object] = sentinel,
212        conn_timeout: Optional[float] = None,
213        timeout: Union[object, ClientTimeout] = sentinel,
214        auto_decompress: bool = True,
215        trust_env: bool = False,
216        requote_redirect_url: bool = True,
217        trace_configs: Optional[List[TraceConfig]] = None,
218        read_bufsize: int = 2 ** 16,
219    ) -> None:
220
221        if loop is None:
222            if connector is not None:
223                loop = connector._loop
224
225        loop = get_running_loop(loop)
226
227        if connector is None:
228            connector = TCPConnector(loop=loop)
229
230        if connector._loop is not loop:
231            raise RuntimeError("Session and connector has to use same event loop")
232
233        self._loop = loop
234
235        if loop.get_debug():
236            self._source_traceback = traceback.extract_stack(sys._getframe(1))
237
238        if cookie_jar is None:
239            cookie_jar = CookieJar(loop=loop)
240        self._cookie_jar = cookie_jar
241
242        if cookies is not None:
243            self._cookie_jar.update_cookies(cookies)
244
245        self._connector = connector  # type: Optional[BaseConnector]
246        self._connector_owner = connector_owner
247        self._default_auth = auth
248        self._version = version
249        self._json_serialize = json_serialize
250        if timeout is sentinel:
251            self._timeout = DEFAULT_TIMEOUT
252            if read_timeout is not sentinel:
253                warnings.warn(
254                    "read_timeout is deprecated, " "use timeout argument instead",
255                    DeprecationWarning,
256                    stacklevel=2,
257                )
258                self._timeout = attr.evolve(self._timeout, total=read_timeout)
259            if conn_timeout is not None:
260                self._timeout = attr.evolve(self._timeout, connect=conn_timeout)
261                warnings.warn(
262                    "conn_timeout is deprecated, " "use timeout argument instead",
263                    DeprecationWarning,
264                    stacklevel=2,
265                )
266        else:
267            self._timeout = timeout  # type: ignore
268            if read_timeout is not sentinel:
269                raise ValueError(
270                    "read_timeout and timeout parameters "
271                    "conflict, please setup "
272                    "timeout.read"
273                )
274            if conn_timeout is not None:
275                raise ValueError(
276                    "conn_timeout and timeout parameters "
277                    "conflict, please setup "
278                    "timeout.connect"
279                )
280        self._raise_for_status = raise_for_status
281        self._auto_decompress = auto_decompress
282        self._trust_env = trust_env
283        self._requote_redirect_url = requote_redirect_url
284        self._read_bufsize = read_bufsize
285
286        # Convert to list of tuples
287        if headers:
288            real_headers = CIMultiDict(headers)  # type: CIMultiDict[str]
289        else:
290            real_headers = CIMultiDict()
291        self._default_headers = real_headers  # type: CIMultiDict[str]
292        if skip_auto_headers is not None:
293            self._skip_auto_headers = frozenset([istr(i) for i in skip_auto_headers])
294        else:
295            self._skip_auto_headers = frozenset()
296
297        self._request_class = request_class
298        self._response_class = response_class
299        self._ws_response_class = ws_response_class
300
301        self._trace_configs = trace_configs or []
302        for trace_config in self._trace_configs:
303            trace_config.freeze()
304
305    def __init_subclass__(cls: Type["ClientSession"]) -> None:
306        warnings.warn(
307            "Inheritance class {} from ClientSession "
308            "is discouraged".format(cls.__name__),
309            DeprecationWarning,
310            stacklevel=2,
311        )
312
313    if DEBUG:
314
315        def __setattr__(self, name: str, val: Any) -> None:
316            if name not in self.ATTRS:
317                warnings.warn(
318                    "Setting custom ClientSession.{} attribute "
319                    "is discouraged".format(name),
320                    DeprecationWarning,
321                    stacklevel=2,
322                )
323            super().__setattr__(name, val)
324
325    def __del__(self, _warnings: Any = warnings) -> None:
326        if not self.closed:
327            if PY_36:
328                kwargs = {"source": self}
329            else:
330                kwargs = {}
331            _warnings.warn(
332                f"Unclosed client session {self!r}", ResourceWarning, **kwargs
333            )
334            context = {"client_session": self, "message": "Unclosed client session"}
335            if self._source_traceback is not None:
336                context["source_traceback"] = self._source_traceback
337            self._loop.call_exception_handler(context)
338
339    def request(
340        self, method: str, url: StrOrURL, **kwargs: Any
341    ) -> "_RequestContextManager":
342        """Perform HTTP request."""
343        return _RequestContextManager(self._request(method, url, **kwargs))
344
345    async def _request(
346        self,
347        method: str,
348        str_or_url: StrOrURL,
349        *,
350        params: Optional[Mapping[str, str]] = None,
351        data: Any = None,
352        json: Any = None,
353        cookies: Optional[LooseCookies] = None,
354        headers: Optional[LooseHeaders] = None,
355        skip_auto_headers: Optional[Iterable[str]] = None,
356        auth: Optional[BasicAuth] = None,
357        allow_redirects: bool = True,
358        max_redirects: int = 10,
359        compress: Optional[str] = None,
360        chunked: Optional[bool] = None,
361        expect100: bool = False,
362        raise_for_status: Optional[bool] = None,
363        read_until_eof: bool = True,
364        proxy: Optional[StrOrURL] = None,
365        proxy_auth: Optional[BasicAuth] = None,
366        timeout: Union[ClientTimeout, object] = sentinel,
367        verify_ssl: Optional[bool] = None,
368        fingerprint: Optional[bytes] = None,
369        ssl_context: Optional[SSLContext] = None,
370        ssl: Optional[Union[SSLContext, bool, Fingerprint]] = None,
371        proxy_headers: Optional[LooseHeaders] = None,
372        trace_request_ctx: Optional[SimpleNamespace] = None,
373        read_bufsize: Optional[int] = None,
374    ) -> ClientResponse:
375
376        # NOTE: timeout clamps existing connect and read timeouts.  We cannot
377        # set the default to None because we need to detect if the user wants
378        # to use the existing timeouts by setting timeout to None.
379
380        if self.closed:
381            raise RuntimeError("Session is closed")
382
383        ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
384
385        if data is not None and json is not None:
386            raise ValueError(
387                "data and json parameters can not be used at the same time"
388            )
389        elif json is not None:
390            data = payload.JsonPayload(json, dumps=self._json_serialize)
391
392        if not isinstance(chunked, bool) and chunked is not None:
393            warnings.warn("Chunk size is deprecated #1615", DeprecationWarning)
394
395        redirects = 0
396        history = []
397        version = self._version
398
399        # Merge with default headers and transform to CIMultiDict
400        headers = self._prepare_headers(headers)
401        proxy_headers = self._prepare_headers(proxy_headers)
402
403        try:
404            url = URL(str_or_url)
405        except ValueError as e:
406            raise InvalidURL(str_or_url) from e
407
408        skip_headers = set(self._skip_auto_headers)
409        if skip_auto_headers is not None:
410            for i in skip_auto_headers:
411                skip_headers.add(istr(i))
412
413        if proxy is not None:
414            try:
415                proxy = URL(proxy)
416            except ValueError as e:
417                raise InvalidURL(proxy) from e
418
419        if timeout is sentinel:
420            real_timeout = self._timeout  # type: ClientTimeout
421        else:
422            if not isinstance(timeout, ClientTimeout):
423                real_timeout = ClientTimeout(total=timeout)  # type: ignore
424            else:
425                real_timeout = timeout
426        # timeout is cumulative for all request operations
427        # (request, redirects, responses, data consuming)
428        tm = TimeoutHandle(self._loop, real_timeout.total)
429        handle = tm.start()
430
431        if read_bufsize is None:
432            read_bufsize = self._read_bufsize
433
434        traces = [
435            Trace(
436                self,
437                trace_config,
438                trace_config.trace_config_ctx(trace_request_ctx=trace_request_ctx),
439            )
440            for trace_config in self._trace_configs
441        ]
442
443        for trace in traces:
444            await trace.send_request_start(method, url, headers)
445
446        timer = tm.timer()
447        try:
448            with timer:
449                while True:
450                    url, auth_from_url = strip_auth_from_url(url)
451                    if auth and auth_from_url:
452                        raise ValueError(
453                            "Cannot combine AUTH argument with "
454                            "credentials encoded in URL"
455                        )
456
457                    if auth is None:
458                        auth = auth_from_url
459                    if auth is None:
460                        auth = self._default_auth
461                    # It would be confusing if we support explicit
462                    # Authorization header with auth argument
463                    if (
464                        headers is not None
465                        and auth is not None
466                        and hdrs.AUTHORIZATION in headers
467                    ):
468                        raise ValueError(
469                            "Cannot combine AUTHORIZATION header "
470                            "with AUTH argument or credentials "
471                            "encoded in URL"
472                        )
473
474                    all_cookies = self._cookie_jar.filter_cookies(url)
475
476                    if cookies is not None:
477                        tmp_cookie_jar = CookieJar()
478                        tmp_cookie_jar.update_cookies(cookies)
479                        req_cookies = tmp_cookie_jar.filter_cookies(url)
480                        if req_cookies:
481                            all_cookies.load(req_cookies)
482
483                    if proxy is not None:
484                        proxy = URL(proxy)
485                    elif self._trust_env:
486                        for scheme, proxy_info in proxies_from_env().items():
487                            if scheme == url.scheme:
488                                proxy = proxy_info.proxy
489                                proxy_auth = proxy_info.proxy_auth
490                                break
491
492                    req = self._request_class(
493                        method,
494                        url,
495                        params=params,
496                        headers=headers,
497                        skip_auto_headers=skip_headers,
498                        data=data,
499                        cookies=all_cookies,
500                        auth=auth,
501                        version=version,
502                        compress=compress,
503                        chunked=chunked,
504                        expect100=expect100,
505                        loop=self._loop,
506                        response_class=self._response_class,
507                        proxy=proxy,
508                        proxy_auth=proxy_auth,
509                        timer=timer,
510                        session=self,
511                        ssl=ssl,
512                        proxy_headers=proxy_headers,
513                        traces=traces,
514                    )
515
516                    # connection timeout
517                    try:
518                        with CeilTimeout(real_timeout.connect, loop=self._loop):
519                            assert self._connector is not None
520                            conn = await self._connector.connect(
521                                req, traces=traces, timeout=real_timeout
522                            )
523                    except asyncio.TimeoutError as exc:
524                        raise ServerTimeoutError(
525                            "Connection timeout " "to host {}".format(url)
526                        ) from exc
527
528                    assert conn.transport is not None
529
530                    assert conn.protocol is not None
531                    conn.protocol.set_response_params(
532                        timer=timer,
533                        skip_payload=method.upper() == "HEAD",
534                        read_until_eof=read_until_eof,
535                        auto_decompress=self._auto_decompress,
536                        read_timeout=real_timeout.sock_read,
537                        read_bufsize=read_bufsize,
538                    )
539
540                    try:
541                        try:
542                            resp = await req.send(conn)
543                            try:
544                                await resp.start(conn)
545                            except BaseException:
546                                resp.close()
547                                raise
548                        except BaseException:
549                            conn.close()
550                            raise
551                    except ClientError:
552                        raise
553                    except OSError as exc:
554                        raise ClientOSError(*exc.args) from exc
555
556                    self._cookie_jar.update_cookies(resp.cookies, resp.url)
557
558                    # redirects
559                    if resp.status in (301, 302, 303, 307, 308) and allow_redirects:
560
561                        for trace in traces:
562                            await trace.send_request_redirect(
563                                method, url, headers, resp
564                            )
565
566                        redirects += 1
567                        history.append(resp)
568                        if max_redirects and redirects >= max_redirects:
569                            resp.close()
570                            raise TooManyRedirects(
571                                history[0].request_info, tuple(history)
572                            )
573
574                        # For 301 and 302, mimic IE, now changed in RFC
575                        # https://github.com/kennethreitz/requests/pull/269
576                        if (resp.status == 303 and resp.method != hdrs.METH_HEAD) or (
577                            resp.status in (301, 302) and resp.method == hdrs.METH_POST
578                        ):
579                            method = hdrs.METH_GET
580                            data = None
581                            if headers.get(hdrs.CONTENT_LENGTH):
582                                headers.pop(hdrs.CONTENT_LENGTH)
583
584                        r_url = resp.headers.get(hdrs.LOCATION) or resp.headers.get(
585                            hdrs.URI
586                        )
587                        if r_url is None:
588                            # see github.com/aio-libs/aiohttp/issues/2022
589                            break
590                        else:
591                            # reading from correct redirection
592                            # response is forbidden
593                            resp.release()
594
595                        try:
596                            parsed_url = URL(
597                                r_url, encoded=not self._requote_redirect_url
598                            )
599
600                        except ValueError as e:
601                            raise InvalidURL(r_url) from e
602
603                        scheme = parsed_url.scheme
604                        if scheme not in ("http", "https", ""):
605                            resp.close()
606                            raise ValueError("Can redirect only to http or https")
607                        elif not scheme:
608                            parsed_url = url.join(parsed_url)
609
610                        if url.origin() != parsed_url.origin():
611                            auth = None
612                            headers.pop(hdrs.AUTHORIZATION, None)
613
614                        url = parsed_url
615                        params = None
616                        resp.release()
617                        continue
618
619                    break
620
621            # check response status
622            if raise_for_status is None:
623                raise_for_status = self._raise_for_status
624            if raise_for_status:
625                resp.raise_for_status()
626
627            # register connection
628            if handle is not None:
629                if resp.connection is not None:
630                    resp.connection.add_callback(handle.cancel)
631                else:
632                    handle.cancel()
633
634            resp._history = tuple(history)
635
636            for trace in traces:
637                await trace.send_request_end(method, url, headers, resp)
638            return resp
639
640        except BaseException as e:
641            # cleanup timer
642            tm.close()
643            if handle:
644                handle.cancel()
645                handle = None
646
647            for trace in traces:
648                await trace.send_request_exception(method, url, headers, e)
649            raise
650
651    def ws_connect(
652        self,
653        url: StrOrURL,
654        *,
655        method: str = hdrs.METH_GET,
656        protocols: Iterable[str] = (),
657        timeout: float = 10.0,
658        receive_timeout: Optional[float] = None,
659        autoclose: bool = True,
660        autoping: bool = True,
661        heartbeat: Optional[float] = None,
662        auth: Optional[BasicAuth] = None,
663        origin: Optional[str] = None,
664        headers: Optional[LooseHeaders] = None,
665        proxy: Optional[StrOrURL] = None,
666        proxy_auth: Optional[BasicAuth] = None,
667        ssl: Union[SSLContext, bool, None, Fingerprint] = None,
668        verify_ssl: Optional[bool] = None,
669        fingerprint: Optional[bytes] = None,
670        ssl_context: Optional[SSLContext] = None,
671        proxy_headers: Optional[LooseHeaders] = None,
672        compress: int = 0,
673        max_msg_size: int = 4 * 1024 * 1024,
674    ) -> "_WSRequestContextManager":
675        """Initiate websocket connection."""
676        return _WSRequestContextManager(
677            self._ws_connect(
678                url,
679                method=method,
680                protocols=protocols,
681                timeout=timeout,
682                receive_timeout=receive_timeout,
683                autoclose=autoclose,
684                autoping=autoping,
685                heartbeat=heartbeat,
686                auth=auth,
687                origin=origin,
688                headers=headers,
689                proxy=proxy,
690                proxy_auth=proxy_auth,
691                ssl=ssl,
692                verify_ssl=verify_ssl,
693                fingerprint=fingerprint,
694                ssl_context=ssl_context,
695                proxy_headers=proxy_headers,
696                compress=compress,
697                max_msg_size=max_msg_size,
698            )
699        )
700
701    async def _ws_connect(
702        self,
703        url: StrOrURL,
704        *,
705        method: str = hdrs.METH_GET,
706        protocols: Iterable[str] = (),
707        timeout: float = 10.0,
708        receive_timeout: Optional[float] = None,
709        autoclose: bool = True,
710        autoping: bool = True,
711        heartbeat: Optional[float] = None,
712        auth: Optional[BasicAuth] = None,
713        origin: Optional[str] = None,
714        headers: Optional[LooseHeaders] = None,
715        proxy: Optional[StrOrURL] = None,
716        proxy_auth: Optional[BasicAuth] = None,
717        ssl: Union[SSLContext, bool, None, Fingerprint] = None,
718        verify_ssl: Optional[bool] = None,
719        fingerprint: Optional[bytes] = None,
720        ssl_context: Optional[SSLContext] = None,
721        proxy_headers: Optional[LooseHeaders] = None,
722        compress: int = 0,
723        max_msg_size: int = 4 * 1024 * 1024,
724    ) -> ClientWebSocketResponse:
725
726        if headers is None:
727            real_headers = CIMultiDict()  # type: CIMultiDict[str]
728        else:
729            real_headers = CIMultiDict(headers)
730
731        default_headers = {
732            hdrs.UPGRADE: "websocket",
733            hdrs.CONNECTION: "upgrade",
734            hdrs.SEC_WEBSOCKET_VERSION: "13",
735        }
736
737        for key, value in default_headers.items():
738            real_headers.setdefault(key, value)
739
740        sec_key = base64.b64encode(os.urandom(16))
741        real_headers[hdrs.SEC_WEBSOCKET_KEY] = sec_key.decode()
742
743        if protocols:
744            real_headers[hdrs.SEC_WEBSOCKET_PROTOCOL] = ",".join(protocols)
745        if origin is not None:
746            real_headers[hdrs.ORIGIN] = origin
747        if compress:
748            extstr = ws_ext_gen(compress=compress)
749            real_headers[hdrs.SEC_WEBSOCKET_EXTENSIONS] = extstr
750
751        ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
752
753        # send request
754        resp = await self.request(
755            method,
756            url,
757            headers=real_headers,
758            read_until_eof=False,
759            auth=auth,
760            proxy=proxy,
761            proxy_auth=proxy_auth,
762            ssl=ssl,
763            proxy_headers=proxy_headers,
764        )
765
766        try:
767            # check handshake
768            if resp.status != 101:
769                raise WSServerHandshakeError(
770                    resp.request_info,
771                    resp.history,
772                    message="Invalid response status",
773                    status=resp.status,
774                    headers=resp.headers,
775                )
776
777            if resp.headers.get(hdrs.UPGRADE, "").lower() != "websocket":
778                raise WSServerHandshakeError(
779                    resp.request_info,
780                    resp.history,
781                    message="Invalid upgrade header",
782                    status=resp.status,
783                    headers=resp.headers,
784                )
785
786            if resp.headers.get(hdrs.CONNECTION, "").lower() != "upgrade":
787                raise WSServerHandshakeError(
788                    resp.request_info,
789                    resp.history,
790                    message="Invalid connection header",
791                    status=resp.status,
792                    headers=resp.headers,
793                )
794
795            # key calculation
796            r_key = resp.headers.get(hdrs.SEC_WEBSOCKET_ACCEPT, "")
797            match = base64.b64encode(hashlib.sha1(sec_key + WS_KEY).digest()).decode()
798            if r_key != match:
799                raise WSServerHandshakeError(
800                    resp.request_info,
801                    resp.history,
802                    message="Invalid challenge response",
803                    status=resp.status,
804                    headers=resp.headers,
805                )
806
807            # websocket protocol
808            protocol = None
809            if protocols and hdrs.SEC_WEBSOCKET_PROTOCOL in resp.headers:
810                resp_protocols = [
811                    proto.strip()
812                    for proto in resp.headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",")
813                ]
814
815                for proto in resp_protocols:
816                    if proto in protocols:
817                        protocol = proto
818                        break
819
820            # websocket compress
821            notakeover = False
822            if compress:
823                compress_hdrs = resp.headers.get(hdrs.SEC_WEBSOCKET_EXTENSIONS)
824                if compress_hdrs:
825                    try:
826                        compress, notakeover = ws_ext_parse(compress_hdrs)
827                    except WSHandshakeError as exc:
828                        raise WSServerHandshakeError(
829                            resp.request_info,
830                            resp.history,
831                            message=exc.args[0],
832                            status=resp.status,
833                            headers=resp.headers,
834                        ) from exc
835                else:
836                    compress = 0
837                    notakeover = False
838
839            conn = resp.connection
840            assert conn is not None
841            conn_proto = conn.protocol
842            assert conn_proto is not None
843            transport = conn.transport
844            assert transport is not None
845            reader = FlowControlDataQueue(
846                conn_proto, 2 ** 16, loop=self._loop
847            )  # type: FlowControlDataQueue[WSMessage]
848            conn_proto.set_parser(WebSocketReader(reader, max_msg_size), reader)
849            writer = WebSocketWriter(
850                conn_proto,
851                transport,
852                use_mask=True,
853                compress=compress,
854                notakeover=notakeover,
855            )
856        except BaseException:
857            resp.close()
858            raise
859        else:
860            return self._ws_response_class(
861                reader,
862                writer,
863                protocol,
864                resp,
865                timeout,
866                autoclose,
867                autoping,
868                self._loop,
869                receive_timeout=receive_timeout,
870                heartbeat=heartbeat,
871                compress=compress,
872                client_notakeover=notakeover,
873            )
874
875    def _prepare_headers(self, headers: Optional[LooseHeaders]) -> "CIMultiDict[str]":
876        """Add default headers and transform it to CIMultiDict"""
877        # Convert headers to MultiDict
878        result = CIMultiDict(self._default_headers)
879        if headers:
880            if not isinstance(headers, (MultiDictProxy, MultiDict)):
881                headers = CIMultiDict(headers)
882            added_names = set()  # type: Set[str]
883            for key, value in headers.items():
884                if key in added_names:
885                    result.add(key, value)
886                else:
887                    result[key] = value
888                    added_names.add(key)
889        return result
890
891    def get(
892        self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any
893    ) -> "_RequestContextManager":
894        """Perform HTTP GET request."""
895        return _RequestContextManager(
896            self._request(hdrs.METH_GET, url, allow_redirects=allow_redirects, **kwargs)
897        )
898
899    def options(
900        self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any
901    ) -> "_RequestContextManager":
902        """Perform HTTP OPTIONS request."""
903        return _RequestContextManager(
904            self._request(
905                hdrs.METH_OPTIONS, url, allow_redirects=allow_redirects, **kwargs
906            )
907        )
908
909    def head(
910        self, url: StrOrURL, *, allow_redirects: bool = False, **kwargs: Any
911    ) -> "_RequestContextManager":
912        """Perform HTTP HEAD request."""
913        return _RequestContextManager(
914            self._request(
915                hdrs.METH_HEAD, url, allow_redirects=allow_redirects, **kwargs
916            )
917        )
918
919    def post(
920        self, url: StrOrURL, *, data: Any = None, **kwargs: Any
921    ) -> "_RequestContextManager":
922        """Perform HTTP POST request."""
923        return _RequestContextManager(
924            self._request(hdrs.METH_POST, url, data=data, **kwargs)
925        )
926
927    def put(
928        self, url: StrOrURL, *, data: Any = None, **kwargs: Any
929    ) -> "_RequestContextManager":
930        """Perform HTTP PUT request."""
931        return _RequestContextManager(
932            self._request(hdrs.METH_PUT, url, data=data, **kwargs)
933        )
934
935    def patch(
936        self, url: StrOrURL, *, data: Any = None, **kwargs: Any
937    ) -> "_RequestContextManager":
938        """Perform HTTP PATCH request."""
939        return _RequestContextManager(
940            self._request(hdrs.METH_PATCH, url, data=data, **kwargs)
941        )
942
943    def delete(self, url: StrOrURL, **kwargs: Any) -> "_RequestContextManager":
944        """Perform HTTP DELETE request."""
945        return _RequestContextManager(self._request(hdrs.METH_DELETE, url, **kwargs))
946
947    async def close(self) -> None:
948        """Close underlying connector.
949
950        Release all acquired resources.
951        """
952        if not self.closed:
953            if self._connector is not None and self._connector_owner:
954                await self._connector.close()
955            self._connector = None
956
957    @property
958    def closed(self) -> bool:
959        """Is client session closed.
960
961        A readonly property.
962        """
963        return self._connector is None or self._connector.closed
964
965    @property
966    def connector(self) -> Optional[BaseConnector]:
967        """Connector instance used for the session."""
968        return self._connector
969
970    @property
971    def cookie_jar(self) -> AbstractCookieJar:
972        """The session cookies."""
973        return self._cookie_jar
974
975    @property
976    def version(self) -> Tuple[int, int]:
977        """The session HTTP protocol version."""
978        return self._version
979
980    @property
981    def requote_redirect_url(self) -> bool:
982        """Do URL requoting on redirection handling."""
983        return self._requote_redirect_url
984
985    @requote_redirect_url.setter
986    def requote_redirect_url(self, val: bool) -> None:
987        """Do URL requoting on redirection handling."""
988        warnings.warn(
989            "session.requote_redirect_url modification " "is deprecated #2778",
990            DeprecationWarning,
991            stacklevel=2,
992        )
993        self._requote_redirect_url = val
994
995    @property
996    def loop(self) -> asyncio.AbstractEventLoop:
997        """Session's loop."""
998        warnings.warn(
999            "client.loop property is deprecated", DeprecationWarning, stacklevel=2
1000        )
1001        return self._loop
1002
1003    @property
1004    def timeout(self) -> Union[object, ClientTimeout]:
1005        """Timeout for the session."""
1006        return self._timeout
1007
1008    @property
1009    def headers(self) -> "CIMultiDict[str]":
1010        """The default headers of the client session."""
1011        return self._default_headers
1012
1013    @property
1014    def skip_auto_headers(self) -> FrozenSet[istr]:
1015        """Headers for which autogeneration should be skipped"""
1016        return self._skip_auto_headers
1017
1018    @property
1019    def auth(self) -> Optional[BasicAuth]:
1020        """An object that represents HTTP Basic Authorization"""
1021        return self._default_auth
1022
1023    @property
1024    def json_serialize(self) -> JSONEncoder:
1025        """Json serializer callable"""
1026        return self._json_serialize
1027
1028    @property
1029    def connector_owner(self) -> bool:
1030        """Should connector be closed on session closing"""
1031        return self._connector_owner
1032
1033    @property
1034    def raise_for_status(
1035        self,
1036    ) -> Union[bool, Callable[[ClientResponse], Awaitable[None]]]:
1037        """
1038        Should `ClientResponse.raise_for_status()`
1039        be called for each response
1040        """
1041        return self._raise_for_status
1042
1043    @property
1044    def auto_decompress(self) -> bool:
1045        """Should the body response be automatically decompressed"""
1046        return self._auto_decompress
1047
1048    @property
1049    def trust_env(self) -> bool:
1050        """
1051        Should get proxies information
1052        from HTTP_PROXY / HTTPS_PROXY environment variables
1053        or ~/.netrc file if present
1054        """
1055        return self._trust_env
1056
1057    @property
1058    def trace_configs(self) -> List[TraceConfig]:
1059        """A list of TraceConfig instances used for client tracing"""
1060        return self._trace_configs
1061
1062    def detach(self) -> None:
1063        """Detach connector from session without closing the former.
1064
1065        Session is switched to closed state anyway.
1066        """
1067        self._connector = None
1068
1069    def __enter__(self) -> None:
1070        raise TypeError("Use async with instead")
1071
1072    def __exit__(
1073        self,
1074        exc_type: Optional[Type[BaseException]],
1075        exc_val: Optional[BaseException],
1076        exc_tb: Optional[TracebackType],
1077    ) -> None:
1078        # __exit__ should exist in pair with __enter__ but never executed
1079        pass  # pragma: no cover
1080
1081    async def __aenter__(self) -> "ClientSession":
1082        return self
1083
1084    async def __aexit__(
1085        self,
1086        exc_type: Optional[Type[BaseException]],
1087        exc_val: Optional[BaseException],
1088        exc_tb: Optional[TracebackType],
1089    ) -> None:
1090        await self.close()
1091
1092
1093class _BaseRequestContextManager(Coroutine[Any, Any, _RetType], Generic[_RetType]):
1094
1095    __slots__ = ("_coro", "_resp")
1096
1097    def __init__(self, coro: Coroutine["asyncio.Future[Any]", None, _RetType]) -> None:
1098        self._coro = coro
1099
1100    def send(self, arg: None) -> "asyncio.Future[Any]":
1101        return self._coro.send(arg)
1102
1103    def throw(self, arg: BaseException) -> None:  # type: ignore
1104        self._coro.throw(arg)
1105
1106    def close(self) -> None:
1107        return self._coro.close()
1108
1109    def __await__(self) -> Generator[Any, None, _RetType]:
1110        ret = self._coro.__await__()
1111        return ret
1112
1113    def __iter__(self) -> Generator[Any, None, _RetType]:
1114        return self.__await__()
1115
1116    async def __aenter__(self) -> _RetType:
1117        self._resp = await self._coro
1118        return self._resp
1119
1120
1121class _RequestContextManager(_BaseRequestContextManager[ClientResponse]):
1122    async def __aexit__(
1123        self,
1124        exc_type: Optional[Type[BaseException]],
1125        exc: Optional[BaseException],
1126        tb: Optional[TracebackType],
1127    ) -> None:
1128        # We're basing behavior on the exception as it can be caused by
1129        # user code unrelated to the status of the connection.  If you
1130        # would like to close a connection you must do that
1131        # explicitly.  Otherwise connection error handling should kick in
1132        # and close/recycle the connection as required.
1133        self._resp.release()
1134
1135
1136class _WSRequestContextManager(_BaseRequestContextManager[ClientWebSocketResponse]):
1137    async def __aexit__(
1138        self,
1139        exc_type: Optional[Type[BaseException]],
1140        exc: Optional[BaseException],
1141        tb: Optional[TracebackType],
1142    ) -> None:
1143        await self._resp.close()
1144
1145
1146class _SessionRequestContextManager:
1147
1148    __slots__ = ("_coro", "_resp", "_session")
1149
1150    def __init__(
1151        self,
1152        coro: Coroutine["asyncio.Future[Any]", None, ClientResponse],
1153        session: ClientSession,
1154    ) -> None:
1155        self._coro = coro
1156        self._resp = None  # type: Optional[ClientResponse]
1157        self._session = session
1158
1159    async def __aenter__(self) -> ClientResponse:
1160        try:
1161            self._resp = await self._coro
1162        except BaseException:
1163            await self._session.close()
1164            raise
1165        else:
1166            return self._resp
1167
1168    async def __aexit__(
1169        self,
1170        exc_type: Optional[Type[BaseException]],
1171        exc: Optional[BaseException],
1172        tb: Optional[TracebackType],
1173    ) -> None:
1174        assert self._resp is not None
1175        self._resp.close()
1176        await self._session.close()
1177
1178
1179def request(
1180    method: str,
1181    url: StrOrURL,
1182    *,
1183    params: Optional[Mapping[str, str]] = None,
1184    data: Any = None,
1185    json: Any = None,
1186    headers: Optional[LooseHeaders] = None,
1187    skip_auto_headers: Optional[Iterable[str]] = None,
1188    auth: Optional[BasicAuth] = None,
1189    allow_redirects: bool = True,
1190    max_redirects: int = 10,
1191    compress: Optional[str] = None,
1192    chunked: Optional[bool] = None,
1193    expect100: bool = False,
1194    raise_for_status: Optional[bool] = None,
1195    read_until_eof: bool = True,
1196    proxy: Optional[StrOrURL] = None,
1197    proxy_auth: Optional[BasicAuth] = None,
1198    timeout: Union[ClientTimeout, object] = sentinel,
1199    cookies: Optional[LooseCookies] = None,
1200    version: HttpVersion = http.HttpVersion11,
1201    connector: Optional[BaseConnector] = None,
1202    read_bufsize: Optional[int] = None,
1203    loop: Optional[asyncio.AbstractEventLoop] = None,
1204) -> _SessionRequestContextManager:
1205    """Constructs and sends a request. Returns response object.
1206    method - HTTP method
1207    url - request url
1208    params - (optional) Dictionary or bytes to be sent in the query
1209      string of the new request
1210    data - (optional) Dictionary, bytes, or file-like object to
1211      send in the body of the request
1212    json - (optional) Any json compatible python object
1213    headers - (optional) Dictionary of HTTP Headers to send with
1214      the request
1215    cookies - (optional) Dict object to send with the request
1216    auth - (optional) BasicAuth named tuple represent HTTP Basic Auth
1217    auth - aiohttp.helpers.BasicAuth
1218    allow_redirects - (optional) If set to False, do not follow
1219      redirects
1220    version - Request HTTP version.
1221    compress - Set to True if request has to be compressed
1222       with deflate encoding.
1223    chunked - Set to chunk size for chunked transfer encoding.
1224    expect100 - Expect 100-continue response from server.
1225    connector - BaseConnector sub-class instance to support
1226       connection pooling.
1227    read_until_eof - Read response until eof if response
1228       does not have Content-Length header.
1229    loop - Optional event loop.
1230    timeout - Optional ClientTimeout settings structure, 5min
1231       total timeout by default.
1232    Usage::
1233      >>> import aiohttp
1234      >>> resp = await aiohttp.request('GET', 'http://python.org/')
1235      >>> resp
1236      <ClientResponse(python.org/) [200]>
1237      >>> data = await resp.read()
1238    """
1239    connector_owner = False
1240    if connector is None:
1241        connector_owner = True
1242        connector = TCPConnector(loop=loop, force_close=True)
1243
1244    session = ClientSession(
1245        loop=loop,
1246        cookies=cookies,
1247        version=version,
1248        timeout=timeout,
1249        connector=connector,
1250        connector_owner=connector_owner,
1251    )
1252
1253    return _SessionRequestContextManager(
1254        session._request(
1255            method,
1256            url,
1257            params=params,
1258            data=data,
1259            json=json,
1260            headers=headers,
1261            skip_auto_headers=skip_auto_headers,
1262            auth=auth,
1263            allow_redirects=allow_redirects,
1264            max_redirects=max_redirects,
1265            compress=compress,
1266            chunked=chunked,
1267            expect100=expect100,
1268            raise_for_status=raise_for_status,
1269            read_until_eof=read_until_eof,
1270            proxy=proxy,
1271            proxy_auth=proxy_auth,
1272            read_bufsize=read_bufsize,
1273        ),
1274        session,
1275    )
1276