1"""HTTP Client for asyncio.""" 2 3import asyncio 4import base64 5import hashlib 6import json 7import os 8import sys 9import traceback 10import warnings 11from types import SimpleNamespace, TracebackType 12from typing import ( 13 Any, 14 Awaitable, 15 Callable, 16 Coroutine, 17 FrozenSet, 18 Generator, 19 Generic, 20 Iterable, 21 List, 22 Mapping, 23 Optional, 24 Set, 25 Tuple, 26 Type, 27 TypeVar, 28 Union, 29) 30 31import attr 32from multidict import CIMultiDict, MultiDict, MultiDictProxy, istr 33from yarl import URL 34 35from . import hdrs, http, payload 36from .abc import AbstractCookieJar 37from .client_exceptions import ( 38 ClientConnectionError as ClientConnectionError, 39 ClientConnectorCertificateError as ClientConnectorCertificateError, 40 ClientConnectorError as ClientConnectorError, 41 ClientConnectorSSLError as ClientConnectorSSLError, 42 ClientError as ClientError, 43 ClientHttpProxyError as ClientHttpProxyError, 44 ClientOSError as ClientOSError, 45 ClientPayloadError as ClientPayloadError, 46 ClientProxyConnectionError as ClientProxyConnectionError, 47 ClientResponseError as ClientResponseError, 48 ClientSSLError as ClientSSLError, 49 ContentTypeError as ContentTypeError, 50 InvalidURL as InvalidURL, 51 ServerConnectionError as ServerConnectionError, 52 ServerDisconnectedError as ServerDisconnectedError, 53 ServerFingerprintMismatch as ServerFingerprintMismatch, 54 ServerTimeoutError as ServerTimeoutError, 55 TooManyRedirects as TooManyRedirects, 56 WSServerHandshakeError as WSServerHandshakeError, 57) 58from .client_reqrep import ( 59 ClientRequest as ClientRequest, 60 ClientResponse as ClientResponse, 61 Fingerprint as Fingerprint, 62 RequestInfo as RequestInfo, 63 _merge_ssl_params, 64) 65from .client_ws import ClientWebSocketResponse as ClientWebSocketResponse 66from .connector import ( 67 BaseConnector as BaseConnector, 68 NamedPipeConnector as NamedPipeConnector, 69 TCPConnector as TCPConnector, 70 UnixConnector as UnixConnector, 71) 72from .cookiejar import CookieJar 73from .helpers import ( 74 DEBUG, 75 PY_36, 76 BasicAuth, 77 CeilTimeout, 78 TimeoutHandle, 79 get_running_loop, 80 proxies_from_env, 81 sentinel, 82 strip_auth_from_url, 83) 84from .http import WS_KEY, HttpVersion, WebSocketReader, WebSocketWriter 85from .http_websocket import WSHandshakeError, WSMessage, ws_ext_gen, ws_ext_parse 86from .streams import FlowControlDataQueue 87from .tracing import Trace, TraceConfig 88from .typedefs import JSONEncoder, LooseCookies, LooseHeaders, StrOrURL 89 90__all__ = ( 91 # client_exceptions 92 "ClientConnectionError", 93 "ClientConnectorCertificateError", 94 "ClientConnectorError", 95 "ClientConnectorSSLError", 96 "ClientError", 97 "ClientHttpProxyError", 98 "ClientOSError", 99 "ClientPayloadError", 100 "ClientProxyConnectionError", 101 "ClientResponseError", 102 "ClientSSLError", 103 "ContentTypeError", 104 "InvalidURL", 105 "ServerConnectionError", 106 "ServerDisconnectedError", 107 "ServerFingerprintMismatch", 108 "ServerTimeoutError", 109 "TooManyRedirects", 110 "WSServerHandshakeError", 111 # client_reqrep 112 "ClientRequest", 113 "ClientResponse", 114 "Fingerprint", 115 "RequestInfo", 116 # connector 117 "BaseConnector", 118 "TCPConnector", 119 "UnixConnector", 120 "NamedPipeConnector", 121 # client_ws 122 "ClientWebSocketResponse", 123 # client 124 "ClientSession", 125 "ClientTimeout", 126 "request", 127) 128 129 130try: 131 from ssl import SSLContext 132except ImportError: # pragma: no cover 133 SSLContext = object # type: ignore 134 135 136@attr.s(auto_attribs=True, frozen=True, slots=True) 137class ClientTimeout: 138 total: Optional[float] = None 139 connect: Optional[float] = None 140 sock_read: Optional[float] = None 141 sock_connect: Optional[float] = None 142 143 # pool_queue_timeout: Optional[float] = None 144 # dns_resolution_timeout: Optional[float] = None 145 # socket_connect_timeout: Optional[float] = None 146 # connection_acquiring_timeout: Optional[float] = None 147 # new_connection_timeout: Optional[float] = None 148 # http_header_timeout: Optional[float] = None 149 # response_body_timeout: Optional[float] = None 150 151 # to create a timeout specific for a single request, either 152 # - create a completely new one to overwrite the default 153 # - or use http://www.attrs.org/en/stable/api.html#attr.evolve 154 # to overwrite the defaults 155 156 157# 5 Minute default read timeout 158DEFAULT_TIMEOUT = ClientTimeout(total=5 * 60) 159 160_RetType = TypeVar("_RetType") 161 162 163class ClientSession: 164 """First-class interface for making HTTP requests.""" 165 166 ATTRS = frozenset( 167 [ 168 "_source_traceback", 169 "_connector", 170 "requote_redirect_url", 171 "_loop", 172 "_cookie_jar", 173 "_connector_owner", 174 "_default_auth", 175 "_version", 176 "_json_serialize", 177 "_requote_redirect_url", 178 "_timeout", 179 "_raise_for_status", 180 "_auto_decompress", 181 "_trust_env", 182 "_default_headers", 183 "_skip_auto_headers", 184 "_request_class", 185 "_response_class", 186 "_ws_response_class", 187 "_trace_configs", 188 "_read_bufsize", 189 ] 190 ) 191 192 _source_traceback = None 193 194 def __init__( 195 self, 196 *, 197 connector: Optional[BaseConnector] = None, 198 loop: Optional[asyncio.AbstractEventLoop] = None, 199 cookies: Optional[LooseCookies] = None, 200 headers: Optional[LooseHeaders] = None, 201 skip_auto_headers: Optional[Iterable[str]] = None, 202 auth: Optional[BasicAuth] = None, 203 json_serialize: JSONEncoder = json.dumps, 204 request_class: Type[ClientRequest] = ClientRequest, 205 response_class: Type[ClientResponse] = ClientResponse, 206 ws_response_class: Type[ClientWebSocketResponse] = ClientWebSocketResponse, 207 version: HttpVersion = http.HttpVersion11, 208 cookie_jar: Optional[AbstractCookieJar] = None, 209 connector_owner: bool = True, 210 raise_for_status: bool = False, 211 read_timeout: Union[float, object] = sentinel, 212 conn_timeout: Optional[float] = None, 213 timeout: Union[object, ClientTimeout] = sentinel, 214 auto_decompress: bool = True, 215 trust_env: bool = False, 216 requote_redirect_url: bool = True, 217 trace_configs: Optional[List[TraceConfig]] = None, 218 read_bufsize: int = 2 ** 16, 219 ) -> None: 220 221 if loop is None: 222 if connector is not None: 223 loop = connector._loop 224 225 loop = get_running_loop(loop) 226 227 if connector is None: 228 connector = TCPConnector(loop=loop) 229 230 if connector._loop is not loop: 231 raise RuntimeError("Session and connector has to use same event loop") 232 233 self._loop = loop 234 235 if loop.get_debug(): 236 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 237 238 if cookie_jar is None: 239 cookie_jar = CookieJar(loop=loop) 240 self._cookie_jar = cookie_jar 241 242 if cookies is not None: 243 self._cookie_jar.update_cookies(cookies) 244 245 self._connector = connector # type: Optional[BaseConnector] 246 self._connector_owner = connector_owner 247 self._default_auth = auth 248 self._version = version 249 self._json_serialize = json_serialize 250 if timeout is sentinel: 251 self._timeout = DEFAULT_TIMEOUT 252 if read_timeout is not sentinel: 253 warnings.warn( 254 "read_timeout is deprecated, " "use timeout argument instead", 255 DeprecationWarning, 256 stacklevel=2, 257 ) 258 self._timeout = attr.evolve(self._timeout, total=read_timeout) 259 if conn_timeout is not None: 260 self._timeout = attr.evolve(self._timeout, connect=conn_timeout) 261 warnings.warn( 262 "conn_timeout is deprecated, " "use timeout argument instead", 263 DeprecationWarning, 264 stacklevel=2, 265 ) 266 else: 267 self._timeout = timeout # type: ignore 268 if read_timeout is not sentinel: 269 raise ValueError( 270 "read_timeout and timeout parameters " 271 "conflict, please setup " 272 "timeout.read" 273 ) 274 if conn_timeout is not None: 275 raise ValueError( 276 "conn_timeout and timeout parameters " 277 "conflict, please setup " 278 "timeout.connect" 279 ) 280 self._raise_for_status = raise_for_status 281 self._auto_decompress = auto_decompress 282 self._trust_env = trust_env 283 self._requote_redirect_url = requote_redirect_url 284 self._read_bufsize = read_bufsize 285 286 # Convert to list of tuples 287 if headers: 288 real_headers = CIMultiDict(headers) # type: CIMultiDict[str] 289 else: 290 real_headers = CIMultiDict() 291 self._default_headers = real_headers # type: CIMultiDict[str] 292 if skip_auto_headers is not None: 293 self._skip_auto_headers = frozenset([istr(i) for i in skip_auto_headers]) 294 else: 295 self._skip_auto_headers = frozenset() 296 297 self._request_class = request_class 298 self._response_class = response_class 299 self._ws_response_class = ws_response_class 300 301 self._trace_configs = trace_configs or [] 302 for trace_config in self._trace_configs: 303 trace_config.freeze() 304 305 def __init_subclass__(cls: Type["ClientSession"]) -> None: 306 warnings.warn( 307 "Inheritance class {} from ClientSession " 308 "is discouraged".format(cls.__name__), 309 DeprecationWarning, 310 stacklevel=2, 311 ) 312 313 if DEBUG: 314 315 def __setattr__(self, name: str, val: Any) -> None: 316 if name not in self.ATTRS: 317 warnings.warn( 318 "Setting custom ClientSession.{} attribute " 319 "is discouraged".format(name), 320 DeprecationWarning, 321 stacklevel=2, 322 ) 323 super().__setattr__(name, val) 324 325 def __del__(self, _warnings: Any = warnings) -> None: 326 if not self.closed: 327 if PY_36: 328 kwargs = {"source": self} 329 else: 330 kwargs = {} 331 _warnings.warn( 332 f"Unclosed client session {self!r}", ResourceWarning, **kwargs 333 ) 334 context = {"client_session": self, "message": "Unclosed client session"} 335 if self._source_traceback is not None: 336 context["source_traceback"] = self._source_traceback 337 self._loop.call_exception_handler(context) 338 339 def request( 340 self, method: str, url: StrOrURL, **kwargs: Any 341 ) -> "_RequestContextManager": 342 """Perform HTTP request.""" 343 return _RequestContextManager(self._request(method, url, **kwargs)) 344 345 async def _request( 346 self, 347 method: str, 348 str_or_url: StrOrURL, 349 *, 350 params: Optional[Mapping[str, str]] = None, 351 data: Any = None, 352 json: Any = None, 353 cookies: Optional[LooseCookies] = None, 354 headers: Optional[LooseHeaders] = None, 355 skip_auto_headers: Optional[Iterable[str]] = None, 356 auth: Optional[BasicAuth] = None, 357 allow_redirects: bool = True, 358 max_redirects: int = 10, 359 compress: Optional[str] = None, 360 chunked: Optional[bool] = None, 361 expect100: bool = False, 362 raise_for_status: Optional[bool] = None, 363 read_until_eof: bool = True, 364 proxy: Optional[StrOrURL] = None, 365 proxy_auth: Optional[BasicAuth] = None, 366 timeout: Union[ClientTimeout, object] = sentinel, 367 verify_ssl: Optional[bool] = None, 368 fingerprint: Optional[bytes] = None, 369 ssl_context: Optional[SSLContext] = None, 370 ssl: Optional[Union[SSLContext, bool, Fingerprint]] = None, 371 proxy_headers: Optional[LooseHeaders] = None, 372 trace_request_ctx: Optional[SimpleNamespace] = None, 373 read_bufsize: Optional[int] = None, 374 ) -> ClientResponse: 375 376 # NOTE: timeout clamps existing connect and read timeouts. We cannot 377 # set the default to None because we need to detect if the user wants 378 # to use the existing timeouts by setting timeout to None. 379 380 if self.closed: 381 raise RuntimeError("Session is closed") 382 383 ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) 384 385 if data is not None and json is not None: 386 raise ValueError( 387 "data and json parameters can not be used at the same time" 388 ) 389 elif json is not None: 390 data = payload.JsonPayload(json, dumps=self._json_serialize) 391 392 if not isinstance(chunked, bool) and chunked is not None: 393 warnings.warn("Chunk size is deprecated #1615", DeprecationWarning) 394 395 redirects = 0 396 history = [] 397 version = self._version 398 399 # Merge with default headers and transform to CIMultiDict 400 headers = self._prepare_headers(headers) 401 proxy_headers = self._prepare_headers(proxy_headers) 402 403 try: 404 url = URL(str_or_url) 405 except ValueError as e: 406 raise InvalidURL(str_or_url) from e 407 408 skip_headers = set(self._skip_auto_headers) 409 if skip_auto_headers is not None: 410 for i in skip_auto_headers: 411 skip_headers.add(istr(i)) 412 413 if proxy is not None: 414 try: 415 proxy = URL(proxy) 416 except ValueError as e: 417 raise InvalidURL(proxy) from e 418 419 if timeout is sentinel: 420 real_timeout = self._timeout # type: ClientTimeout 421 else: 422 if not isinstance(timeout, ClientTimeout): 423 real_timeout = ClientTimeout(total=timeout) # type: ignore 424 else: 425 real_timeout = timeout 426 # timeout is cumulative for all request operations 427 # (request, redirects, responses, data consuming) 428 tm = TimeoutHandle(self._loop, real_timeout.total) 429 handle = tm.start() 430 431 if read_bufsize is None: 432 read_bufsize = self._read_bufsize 433 434 traces = [ 435 Trace( 436 self, 437 trace_config, 438 trace_config.trace_config_ctx(trace_request_ctx=trace_request_ctx), 439 ) 440 for trace_config in self._trace_configs 441 ] 442 443 for trace in traces: 444 await trace.send_request_start(method, url, headers) 445 446 timer = tm.timer() 447 try: 448 with timer: 449 while True: 450 url, auth_from_url = strip_auth_from_url(url) 451 if auth and auth_from_url: 452 raise ValueError( 453 "Cannot combine AUTH argument with " 454 "credentials encoded in URL" 455 ) 456 457 if auth is None: 458 auth = auth_from_url 459 if auth is None: 460 auth = self._default_auth 461 # It would be confusing if we support explicit 462 # Authorization header with auth argument 463 if ( 464 headers is not None 465 and auth is not None 466 and hdrs.AUTHORIZATION in headers 467 ): 468 raise ValueError( 469 "Cannot combine AUTHORIZATION header " 470 "with AUTH argument or credentials " 471 "encoded in URL" 472 ) 473 474 all_cookies = self._cookie_jar.filter_cookies(url) 475 476 if cookies is not None: 477 tmp_cookie_jar = CookieJar() 478 tmp_cookie_jar.update_cookies(cookies) 479 req_cookies = tmp_cookie_jar.filter_cookies(url) 480 if req_cookies: 481 all_cookies.load(req_cookies) 482 483 if proxy is not None: 484 proxy = URL(proxy) 485 elif self._trust_env: 486 for scheme, proxy_info in proxies_from_env().items(): 487 if scheme == url.scheme: 488 proxy = proxy_info.proxy 489 proxy_auth = proxy_info.proxy_auth 490 break 491 492 req = self._request_class( 493 method, 494 url, 495 params=params, 496 headers=headers, 497 skip_auto_headers=skip_headers, 498 data=data, 499 cookies=all_cookies, 500 auth=auth, 501 version=version, 502 compress=compress, 503 chunked=chunked, 504 expect100=expect100, 505 loop=self._loop, 506 response_class=self._response_class, 507 proxy=proxy, 508 proxy_auth=proxy_auth, 509 timer=timer, 510 session=self, 511 ssl=ssl, 512 proxy_headers=proxy_headers, 513 traces=traces, 514 ) 515 516 # connection timeout 517 try: 518 with CeilTimeout(real_timeout.connect, loop=self._loop): 519 assert self._connector is not None 520 conn = await self._connector.connect( 521 req, traces=traces, timeout=real_timeout 522 ) 523 except asyncio.TimeoutError as exc: 524 raise ServerTimeoutError( 525 "Connection timeout " "to host {}".format(url) 526 ) from exc 527 528 assert conn.transport is not None 529 530 assert conn.protocol is not None 531 conn.protocol.set_response_params( 532 timer=timer, 533 skip_payload=method.upper() == "HEAD", 534 read_until_eof=read_until_eof, 535 auto_decompress=self._auto_decompress, 536 read_timeout=real_timeout.sock_read, 537 read_bufsize=read_bufsize, 538 ) 539 540 try: 541 try: 542 resp = await req.send(conn) 543 try: 544 await resp.start(conn) 545 except BaseException: 546 resp.close() 547 raise 548 except BaseException: 549 conn.close() 550 raise 551 except ClientError: 552 raise 553 except OSError as exc: 554 raise ClientOSError(*exc.args) from exc 555 556 self._cookie_jar.update_cookies(resp.cookies, resp.url) 557 558 # redirects 559 if resp.status in (301, 302, 303, 307, 308) and allow_redirects: 560 561 for trace in traces: 562 await trace.send_request_redirect( 563 method, url, headers, resp 564 ) 565 566 redirects += 1 567 history.append(resp) 568 if max_redirects and redirects >= max_redirects: 569 resp.close() 570 raise TooManyRedirects( 571 history[0].request_info, tuple(history) 572 ) 573 574 # For 301 and 302, mimic IE, now changed in RFC 575 # https://github.com/kennethreitz/requests/pull/269 576 if (resp.status == 303 and resp.method != hdrs.METH_HEAD) or ( 577 resp.status in (301, 302) and resp.method == hdrs.METH_POST 578 ): 579 method = hdrs.METH_GET 580 data = None 581 if headers.get(hdrs.CONTENT_LENGTH): 582 headers.pop(hdrs.CONTENT_LENGTH) 583 584 r_url = resp.headers.get(hdrs.LOCATION) or resp.headers.get( 585 hdrs.URI 586 ) 587 if r_url is None: 588 # see github.com/aio-libs/aiohttp/issues/2022 589 break 590 else: 591 # reading from correct redirection 592 # response is forbidden 593 resp.release() 594 595 try: 596 parsed_url = URL( 597 r_url, encoded=not self._requote_redirect_url 598 ) 599 600 except ValueError as e: 601 raise InvalidURL(r_url) from e 602 603 scheme = parsed_url.scheme 604 if scheme not in ("http", "https", ""): 605 resp.close() 606 raise ValueError("Can redirect only to http or https") 607 elif not scheme: 608 parsed_url = url.join(parsed_url) 609 610 if url.origin() != parsed_url.origin(): 611 auth = None 612 headers.pop(hdrs.AUTHORIZATION, None) 613 614 url = parsed_url 615 params = None 616 resp.release() 617 continue 618 619 break 620 621 # check response status 622 if raise_for_status is None: 623 raise_for_status = self._raise_for_status 624 if raise_for_status: 625 resp.raise_for_status() 626 627 # register connection 628 if handle is not None: 629 if resp.connection is not None: 630 resp.connection.add_callback(handle.cancel) 631 else: 632 handle.cancel() 633 634 resp._history = tuple(history) 635 636 for trace in traces: 637 await trace.send_request_end(method, url, headers, resp) 638 return resp 639 640 except BaseException as e: 641 # cleanup timer 642 tm.close() 643 if handle: 644 handle.cancel() 645 handle = None 646 647 for trace in traces: 648 await trace.send_request_exception(method, url, headers, e) 649 raise 650 651 def ws_connect( 652 self, 653 url: StrOrURL, 654 *, 655 method: str = hdrs.METH_GET, 656 protocols: Iterable[str] = (), 657 timeout: float = 10.0, 658 receive_timeout: Optional[float] = None, 659 autoclose: bool = True, 660 autoping: bool = True, 661 heartbeat: Optional[float] = None, 662 auth: Optional[BasicAuth] = None, 663 origin: Optional[str] = None, 664 headers: Optional[LooseHeaders] = None, 665 proxy: Optional[StrOrURL] = None, 666 proxy_auth: Optional[BasicAuth] = None, 667 ssl: Union[SSLContext, bool, None, Fingerprint] = None, 668 verify_ssl: Optional[bool] = None, 669 fingerprint: Optional[bytes] = None, 670 ssl_context: Optional[SSLContext] = None, 671 proxy_headers: Optional[LooseHeaders] = None, 672 compress: int = 0, 673 max_msg_size: int = 4 * 1024 * 1024, 674 ) -> "_WSRequestContextManager": 675 """Initiate websocket connection.""" 676 return _WSRequestContextManager( 677 self._ws_connect( 678 url, 679 method=method, 680 protocols=protocols, 681 timeout=timeout, 682 receive_timeout=receive_timeout, 683 autoclose=autoclose, 684 autoping=autoping, 685 heartbeat=heartbeat, 686 auth=auth, 687 origin=origin, 688 headers=headers, 689 proxy=proxy, 690 proxy_auth=proxy_auth, 691 ssl=ssl, 692 verify_ssl=verify_ssl, 693 fingerprint=fingerprint, 694 ssl_context=ssl_context, 695 proxy_headers=proxy_headers, 696 compress=compress, 697 max_msg_size=max_msg_size, 698 ) 699 ) 700 701 async def _ws_connect( 702 self, 703 url: StrOrURL, 704 *, 705 method: str = hdrs.METH_GET, 706 protocols: Iterable[str] = (), 707 timeout: float = 10.0, 708 receive_timeout: Optional[float] = None, 709 autoclose: bool = True, 710 autoping: bool = True, 711 heartbeat: Optional[float] = None, 712 auth: Optional[BasicAuth] = None, 713 origin: Optional[str] = None, 714 headers: Optional[LooseHeaders] = None, 715 proxy: Optional[StrOrURL] = None, 716 proxy_auth: Optional[BasicAuth] = None, 717 ssl: Union[SSLContext, bool, None, Fingerprint] = None, 718 verify_ssl: Optional[bool] = None, 719 fingerprint: Optional[bytes] = None, 720 ssl_context: Optional[SSLContext] = None, 721 proxy_headers: Optional[LooseHeaders] = None, 722 compress: int = 0, 723 max_msg_size: int = 4 * 1024 * 1024, 724 ) -> ClientWebSocketResponse: 725 726 if headers is None: 727 real_headers = CIMultiDict() # type: CIMultiDict[str] 728 else: 729 real_headers = CIMultiDict(headers) 730 731 default_headers = { 732 hdrs.UPGRADE: "websocket", 733 hdrs.CONNECTION: "upgrade", 734 hdrs.SEC_WEBSOCKET_VERSION: "13", 735 } 736 737 for key, value in default_headers.items(): 738 real_headers.setdefault(key, value) 739 740 sec_key = base64.b64encode(os.urandom(16)) 741 real_headers[hdrs.SEC_WEBSOCKET_KEY] = sec_key.decode() 742 743 if protocols: 744 real_headers[hdrs.SEC_WEBSOCKET_PROTOCOL] = ",".join(protocols) 745 if origin is not None: 746 real_headers[hdrs.ORIGIN] = origin 747 if compress: 748 extstr = ws_ext_gen(compress=compress) 749 real_headers[hdrs.SEC_WEBSOCKET_EXTENSIONS] = extstr 750 751 ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) 752 753 # send request 754 resp = await self.request( 755 method, 756 url, 757 headers=real_headers, 758 read_until_eof=False, 759 auth=auth, 760 proxy=proxy, 761 proxy_auth=proxy_auth, 762 ssl=ssl, 763 proxy_headers=proxy_headers, 764 ) 765 766 try: 767 # check handshake 768 if resp.status != 101: 769 raise WSServerHandshakeError( 770 resp.request_info, 771 resp.history, 772 message="Invalid response status", 773 status=resp.status, 774 headers=resp.headers, 775 ) 776 777 if resp.headers.get(hdrs.UPGRADE, "").lower() != "websocket": 778 raise WSServerHandshakeError( 779 resp.request_info, 780 resp.history, 781 message="Invalid upgrade header", 782 status=resp.status, 783 headers=resp.headers, 784 ) 785 786 if resp.headers.get(hdrs.CONNECTION, "").lower() != "upgrade": 787 raise WSServerHandshakeError( 788 resp.request_info, 789 resp.history, 790 message="Invalid connection header", 791 status=resp.status, 792 headers=resp.headers, 793 ) 794 795 # key calculation 796 r_key = resp.headers.get(hdrs.SEC_WEBSOCKET_ACCEPT, "") 797 match = base64.b64encode(hashlib.sha1(sec_key + WS_KEY).digest()).decode() 798 if r_key != match: 799 raise WSServerHandshakeError( 800 resp.request_info, 801 resp.history, 802 message="Invalid challenge response", 803 status=resp.status, 804 headers=resp.headers, 805 ) 806 807 # websocket protocol 808 protocol = None 809 if protocols and hdrs.SEC_WEBSOCKET_PROTOCOL in resp.headers: 810 resp_protocols = [ 811 proto.strip() 812 for proto in resp.headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",") 813 ] 814 815 for proto in resp_protocols: 816 if proto in protocols: 817 protocol = proto 818 break 819 820 # websocket compress 821 notakeover = False 822 if compress: 823 compress_hdrs = resp.headers.get(hdrs.SEC_WEBSOCKET_EXTENSIONS) 824 if compress_hdrs: 825 try: 826 compress, notakeover = ws_ext_parse(compress_hdrs) 827 except WSHandshakeError as exc: 828 raise WSServerHandshakeError( 829 resp.request_info, 830 resp.history, 831 message=exc.args[0], 832 status=resp.status, 833 headers=resp.headers, 834 ) from exc 835 else: 836 compress = 0 837 notakeover = False 838 839 conn = resp.connection 840 assert conn is not None 841 conn_proto = conn.protocol 842 assert conn_proto is not None 843 transport = conn.transport 844 assert transport is not None 845 reader = FlowControlDataQueue( 846 conn_proto, 2 ** 16, loop=self._loop 847 ) # type: FlowControlDataQueue[WSMessage] 848 conn_proto.set_parser(WebSocketReader(reader, max_msg_size), reader) 849 writer = WebSocketWriter( 850 conn_proto, 851 transport, 852 use_mask=True, 853 compress=compress, 854 notakeover=notakeover, 855 ) 856 except BaseException: 857 resp.close() 858 raise 859 else: 860 return self._ws_response_class( 861 reader, 862 writer, 863 protocol, 864 resp, 865 timeout, 866 autoclose, 867 autoping, 868 self._loop, 869 receive_timeout=receive_timeout, 870 heartbeat=heartbeat, 871 compress=compress, 872 client_notakeover=notakeover, 873 ) 874 875 def _prepare_headers(self, headers: Optional[LooseHeaders]) -> "CIMultiDict[str]": 876 """Add default headers and transform it to CIMultiDict""" 877 # Convert headers to MultiDict 878 result = CIMultiDict(self._default_headers) 879 if headers: 880 if not isinstance(headers, (MultiDictProxy, MultiDict)): 881 headers = CIMultiDict(headers) 882 added_names = set() # type: Set[str] 883 for key, value in headers.items(): 884 if key in added_names: 885 result.add(key, value) 886 else: 887 result[key] = value 888 added_names.add(key) 889 return result 890 891 def get( 892 self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any 893 ) -> "_RequestContextManager": 894 """Perform HTTP GET request.""" 895 return _RequestContextManager( 896 self._request(hdrs.METH_GET, url, allow_redirects=allow_redirects, **kwargs) 897 ) 898 899 def options( 900 self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any 901 ) -> "_RequestContextManager": 902 """Perform HTTP OPTIONS request.""" 903 return _RequestContextManager( 904 self._request( 905 hdrs.METH_OPTIONS, url, allow_redirects=allow_redirects, **kwargs 906 ) 907 ) 908 909 def head( 910 self, url: StrOrURL, *, allow_redirects: bool = False, **kwargs: Any 911 ) -> "_RequestContextManager": 912 """Perform HTTP HEAD request.""" 913 return _RequestContextManager( 914 self._request( 915 hdrs.METH_HEAD, url, allow_redirects=allow_redirects, **kwargs 916 ) 917 ) 918 919 def post( 920 self, url: StrOrURL, *, data: Any = None, **kwargs: Any 921 ) -> "_RequestContextManager": 922 """Perform HTTP POST request.""" 923 return _RequestContextManager( 924 self._request(hdrs.METH_POST, url, data=data, **kwargs) 925 ) 926 927 def put( 928 self, url: StrOrURL, *, data: Any = None, **kwargs: Any 929 ) -> "_RequestContextManager": 930 """Perform HTTP PUT request.""" 931 return _RequestContextManager( 932 self._request(hdrs.METH_PUT, url, data=data, **kwargs) 933 ) 934 935 def patch( 936 self, url: StrOrURL, *, data: Any = None, **kwargs: Any 937 ) -> "_RequestContextManager": 938 """Perform HTTP PATCH request.""" 939 return _RequestContextManager( 940 self._request(hdrs.METH_PATCH, url, data=data, **kwargs) 941 ) 942 943 def delete(self, url: StrOrURL, **kwargs: Any) -> "_RequestContextManager": 944 """Perform HTTP DELETE request.""" 945 return _RequestContextManager(self._request(hdrs.METH_DELETE, url, **kwargs)) 946 947 async def close(self) -> None: 948 """Close underlying connector. 949 950 Release all acquired resources. 951 """ 952 if not self.closed: 953 if self._connector is not None and self._connector_owner: 954 await self._connector.close() 955 self._connector = None 956 957 @property 958 def closed(self) -> bool: 959 """Is client session closed. 960 961 A readonly property. 962 """ 963 return self._connector is None or self._connector.closed 964 965 @property 966 def connector(self) -> Optional[BaseConnector]: 967 """Connector instance used for the session.""" 968 return self._connector 969 970 @property 971 def cookie_jar(self) -> AbstractCookieJar: 972 """The session cookies.""" 973 return self._cookie_jar 974 975 @property 976 def version(self) -> Tuple[int, int]: 977 """The session HTTP protocol version.""" 978 return self._version 979 980 @property 981 def requote_redirect_url(self) -> bool: 982 """Do URL requoting on redirection handling.""" 983 return self._requote_redirect_url 984 985 @requote_redirect_url.setter 986 def requote_redirect_url(self, val: bool) -> None: 987 """Do URL requoting on redirection handling.""" 988 warnings.warn( 989 "session.requote_redirect_url modification " "is deprecated #2778", 990 DeprecationWarning, 991 stacklevel=2, 992 ) 993 self._requote_redirect_url = val 994 995 @property 996 def loop(self) -> asyncio.AbstractEventLoop: 997 """Session's loop.""" 998 warnings.warn( 999 "client.loop property is deprecated", DeprecationWarning, stacklevel=2 1000 ) 1001 return self._loop 1002 1003 @property 1004 def timeout(self) -> Union[object, ClientTimeout]: 1005 """Timeout for the session.""" 1006 return self._timeout 1007 1008 @property 1009 def headers(self) -> "CIMultiDict[str]": 1010 """The default headers of the client session.""" 1011 return self._default_headers 1012 1013 @property 1014 def skip_auto_headers(self) -> FrozenSet[istr]: 1015 """Headers for which autogeneration should be skipped""" 1016 return self._skip_auto_headers 1017 1018 @property 1019 def auth(self) -> Optional[BasicAuth]: 1020 """An object that represents HTTP Basic Authorization""" 1021 return self._default_auth 1022 1023 @property 1024 def json_serialize(self) -> JSONEncoder: 1025 """Json serializer callable""" 1026 return self._json_serialize 1027 1028 @property 1029 def connector_owner(self) -> bool: 1030 """Should connector be closed on session closing""" 1031 return self._connector_owner 1032 1033 @property 1034 def raise_for_status( 1035 self, 1036 ) -> Union[bool, Callable[[ClientResponse], Awaitable[None]]]: 1037 """ 1038 Should `ClientResponse.raise_for_status()` 1039 be called for each response 1040 """ 1041 return self._raise_for_status 1042 1043 @property 1044 def auto_decompress(self) -> bool: 1045 """Should the body response be automatically decompressed""" 1046 return self._auto_decompress 1047 1048 @property 1049 def trust_env(self) -> bool: 1050 """ 1051 Should get proxies information 1052 from HTTP_PROXY / HTTPS_PROXY environment variables 1053 or ~/.netrc file if present 1054 """ 1055 return self._trust_env 1056 1057 @property 1058 def trace_configs(self) -> List[TraceConfig]: 1059 """A list of TraceConfig instances used for client tracing""" 1060 return self._trace_configs 1061 1062 def detach(self) -> None: 1063 """Detach connector from session without closing the former. 1064 1065 Session is switched to closed state anyway. 1066 """ 1067 self._connector = None 1068 1069 def __enter__(self) -> None: 1070 raise TypeError("Use async with instead") 1071 1072 def __exit__( 1073 self, 1074 exc_type: Optional[Type[BaseException]], 1075 exc_val: Optional[BaseException], 1076 exc_tb: Optional[TracebackType], 1077 ) -> None: 1078 # __exit__ should exist in pair with __enter__ but never executed 1079 pass # pragma: no cover 1080 1081 async def __aenter__(self) -> "ClientSession": 1082 return self 1083 1084 async def __aexit__( 1085 self, 1086 exc_type: Optional[Type[BaseException]], 1087 exc_val: Optional[BaseException], 1088 exc_tb: Optional[TracebackType], 1089 ) -> None: 1090 await self.close() 1091 1092 1093class _BaseRequestContextManager(Coroutine[Any, Any, _RetType], Generic[_RetType]): 1094 1095 __slots__ = ("_coro", "_resp") 1096 1097 def __init__(self, coro: Coroutine["asyncio.Future[Any]", None, _RetType]) -> None: 1098 self._coro = coro 1099 1100 def send(self, arg: None) -> "asyncio.Future[Any]": 1101 return self._coro.send(arg) 1102 1103 def throw(self, arg: BaseException) -> None: # type: ignore 1104 self._coro.throw(arg) 1105 1106 def close(self) -> None: 1107 return self._coro.close() 1108 1109 def __await__(self) -> Generator[Any, None, _RetType]: 1110 ret = self._coro.__await__() 1111 return ret 1112 1113 def __iter__(self) -> Generator[Any, None, _RetType]: 1114 return self.__await__() 1115 1116 async def __aenter__(self) -> _RetType: 1117 self._resp = await self._coro 1118 return self._resp 1119 1120 1121class _RequestContextManager(_BaseRequestContextManager[ClientResponse]): 1122 async def __aexit__( 1123 self, 1124 exc_type: Optional[Type[BaseException]], 1125 exc: Optional[BaseException], 1126 tb: Optional[TracebackType], 1127 ) -> None: 1128 # We're basing behavior on the exception as it can be caused by 1129 # user code unrelated to the status of the connection. If you 1130 # would like to close a connection you must do that 1131 # explicitly. Otherwise connection error handling should kick in 1132 # and close/recycle the connection as required. 1133 self._resp.release() 1134 1135 1136class _WSRequestContextManager(_BaseRequestContextManager[ClientWebSocketResponse]): 1137 async def __aexit__( 1138 self, 1139 exc_type: Optional[Type[BaseException]], 1140 exc: Optional[BaseException], 1141 tb: Optional[TracebackType], 1142 ) -> None: 1143 await self._resp.close() 1144 1145 1146class _SessionRequestContextManager: 1147 1148 __slots__ = ("_coro", "_resp", "_session") 1149 1150 def __init__( 1151 self, 1152 coro: Coroutine["asyncio.Future[Any]", None, ClientResponse], 1153 session: ClientSession, 1154 ) -> None: 1155 self._coro = coro 1156 self._resp = None # type: Optional[ClientResponse] 1157 self._session = session 1158 1159 async def __aenter__(self) -> ClientResponse: 1160 try: 1161 self._resp = await self._coro 1162 except BaseException: 1163 await self._session.close() 1164 raise 1165 else: 1166 return self._resp 1167 1168 async def __aexit__( 1169 self, 1170 exc_type: Optional[Type[BaseException]], 1171 exc: Optional[BaseException], 1172 tb: Optional[TracebackType], 1173 ) -> None: 1174 assert self._resp is not None 1175 self._resp.close() 1176 await self._session.close() 1177 1178 1179def request( 1180 method: str, 1181 url: StrOrURL, 1182 *, 1183 params: Optional[Mapping[str, str]] = None, 1184 data: Any = None, 1185 json: Any = None, 1186 headers: Optional[LooseHeaders] = None, 1187 skip_auto_headers: Optional[Iterable[str]] = None, 1188 auth: Optional[BasicAuth] = None, 1189 allow_redirects: bool = True, 1190 max_redirects: int = 10, 1191 compress: Optional[str] = None, 1192 chunked: Optional[bool] = None, 1193 expect100: bool = False, 1194 raise_for_status: Optional[bool] = None, 1195 read_until_eof: bool = True, 1196 proxy: Optional[StrOrURL] = None, 1197 proxy_auth: Optional[BasicAuth] = None, 1198 timeout: Union[ClientTimeout, object] = sentinel, 1199 cookies: Optional[LooseCookies] = None, 1200 version: HttpVersion = http.HttpVersion11, 1201 connector: Optional[BaseConnector] = None, 1202 read_bufsize: Optional[int] = None, 1203 loop: Optional[asyncio.AbstractEventLoop] = None, 1204) -> _SessionRequestContextManager: 1205 """Constructs and sends a request. Returns response object. 1206 method - HTTP method 1207 url - request url 1208 params - (optional) Dictionary or bytes to be sent in the query 1209 string of the new request 1210 data - (optional) Dictionary, bytes, or file-like object to 1211 send in the body of the request 1212 json - (optional) Any json compatible python object 1213 headers - (optional) Dictionary of HTTP Headers to send with 1214 the request 1215 cookies - (optional) Dict object to send with the request 1216 auth - (optional) BasicAuth named tuple represent HTTP Basic Auth 1217 auth - aiohttp.helpers.BasicAuth 1218 allow_redirects - (optional) If set to False, do not follow 1219 redirects 1220 version - Request HTTP version. 1221 compress - Set to True if request has to be compressed 1222 with deflate encoding. 1223 chunked - Set to chunk size for chunked transfer encoding. 1224 expect100 - Expect 100-continue response from server. 1225 connector - BaseConnector sub-class instance to support 1226 connection pooling. 1227 read_until_eof - Read response until eof if response 1228 does not have Content-Length header. 1229 loop - Optional event loop. 1230 timeout - Optional ClientTimeout settings structure, 5min 1231 total timeout by default. 1232 Usage:: 1233 >>> import aiohttp 1234 >>> resp = await aiohttp.request('GET', 'http://python.org/') 1235 >>> resp 1236 <ClientResponse(python.org/) [200]> 1237 >>> data = await resp.read() 1238 """ 1239 connector_owner = False 1240 if connector is None: 1241 connector_owner = True 1242 connector = TCPConnector(loop=loop, force_close=True) 1243 1244 session = ClientSession( 1245 loop=loop, 1246 cookies=cookies, 1247 version=version, 1248 timeout=timeout, 1249 connector=connector, 1250 connector_owner=connector_owner, 1251 ) 1252 1253 return _SessionRequestContextManager( 1254 session._request( 1255 method, 1256 url, 1257 params=params, 1258 data=data, 1259 json=json, 1260 headers=headers, 1261 skip_auto_headers=skip_auto_headers, 1262 auth=auth, 1263 allow_redirects=allow_redirects, 1264 max_redirects=max_redirects, 1265 compress=compress, 1266 chunked=chunked, 1267 expect100=expect100, 1268 raise_for_status=raise_for_status, 1269 read_until_eof=read_until_eof, 1270 proxy=proxy, 1271 proxy_auth=proxy_auth, 1272 read_bufsize=read_bufsize, 1273 ), 1274 session, 1275 ) 1276