1import asyncio 2import functools 3import random 4import sys 5import traceback 6import warnings 7from collections import defaultdict, deque 8from contextlib import suppress 9from http.cookies import SimpleCookie 10from itertools import cycle, islice 11from time import monotonic 12from types import TracebackType 13from typing import ( 14 TYPE_CHECKING, 15 Any, 16 Awaitable, 17 Callable, 18 DefaultDict, 19 Dict, 20 Iterator, 21 List, 22 Optional, 23 Set, 24 Tuple, 25 Type, 26 Union, 27 cast, 28) 29 30import attr 31 32from . import hdrs, helpers 33from .abc import AbstractResolver 34from .client_exceptions import ( 35 ClientConnectionError, 36 ClientConnectorCertificateError, 37 ClientConnectorError, 38 ClientConnectorSSLError, 39 ClientHttpProxyError, 40 ClientProxyConnectionError, 41 ServerFingerprintMismatch, 42 cert_errors, 43 ssl_errors, 44) 45from .client_proto import ResponseHandler 46from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params 47from .helpers import PY_36, CeilTimeout, get_running_loop, is_ip_address, noop, sentinel 48from .http import RESPONSES 49from .locks import EventResultOrError 50from .resolver import DefaultResolver 51 52try: 53 import ssl 54 55 SSLContext = ssl.SSLContext 56except ImportError: # pragma: no cover 57 ssl = None # type: ignore 58 SSLContext = object # type: ignore 59 60 61__all__ = ("BaseConnector", "TCPConnector", "UnixConnector", "NamedPipeConnector") 62 63 64if TYPE_CHECKING: # pragma: no cover 65 from .client import ClientTimeout 66 from .client_reqrep import ConnectionKey 67 from .tracing import Trace 68 69 70class _DeprecationWaiter: 71 __slots__ = ("_awaitable", "_awaited") 72 73 def __init__(self, awaitable: Awaitable[Any]) -> None: 74 self._awaitable = awaitable 75 self._awaited = False 76 77 def __await__(self) -> Any: 78 self._awaited = True 79 return self._awaitable.__await__() 80 81 def __del__(self) -> None: 82 if not self._awaited: 83 warnings.warn( 84 "Connector.close() is a coroutine, " 85 "please use await connector.close()", 86 DeprecationWarning, 87 ) 88 89 90class Connection: 91 92 _source_traceback = None 93 _transport = None 94 95 def __init__( 96 self, 97 connector: "BaseConnector", 98 key: "ConnectionKey", 99 protocol: ResponseHandler, 100 loop: asyncio.AbstractEventLoop, 101 ) -> None: 102 self._key = key 103 self._connector = connector 104 self._loop = loop 105 self._protocol = protocol # type: Optional[ResponseHandler] 106 self._callbacks = [] # type: List[Callable[[], None]] 107 108 if loop.get_debug(): 109 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 110 111 def __repr__(self) -> str: 112 return f"Connection<{self._key}>" 113 114 def __del__(self, _warnings: Any = warnings) -> None: 115 if self._protocol is not None: 116 if PY_36: 117 kwargs = {"source": self} 118 else: 119 kwargs = {} 120 _warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs) 121 if self._loop.is_closed(): 122 return 123 124 self._connector._release(self._key, self._protocol, should_close=True) 125 126 context = {"client_connection": self, "message": "Unclosed connection"} 127 if self._source_traceback is not None: 128 context["source_traceback"] = self._source_traceback 129 self._loop.call_exception_handler(context) 130 131 @property 132 def loop(self) -> asyncio.AbstractEventLoop: 133 warnings.warn( 134 "connector.loop property is deprecated", DeprecationWarning, stacklevel=2 135 ) 136 return self._loop 137 138 @property 139 def transport(self) -> Optional[asyncio.Transport]: 140 if self._protocol is None: 141 return None 142 return self._protocol.transport 143 144 @property 145 def protocol(self) -> Optional[ResponseHandler]: 146 return self._protocol 147 148 def add_callback(self, callback: Callable[[], None]) -> None: 149 if callback is not None: 150 self._callbacks.append(callback) 151 152 def _notify_release(self) -> None: 153 callbacks, self._callbacks = self._callbacks[:], [] 154 155 for cb in callbacks: 156 with suppress(Exception): 157 cb() 158 159 def close(self) -> None: 160 self._notify_release() 161 162 if self._protocol is not None: 163 self._connector._release(self._key, self._protocol, should_close=True) 164 self._protocol = None 165 166 def release(self) -> None: 167 self._notify_release() 168 169 if self._protocol is not None: 170 self._connector._release( 171 self._key, self._protocol, should_close=self._protocol.should_close 172 ) 173 self._protocol = None 174 175 @property 176 def closed(self) -> bool: 177 return self._protocol is None or not self._protocol.is_connected() 178 179 180class _TransportPlaceholder: 181 """ placeholder for BaseConnector.connect function """ 182 183 def close(self) -> None: 184 pass 185 186 187class BaseConnector: 188 """Base connector class. 189 190 keepalive_timeout - (optional) Keep-alive timeout. 191 force_close - Set to True to force close and do reconnect 192 after each request (and between redirects). 193 limit - The total number of simultaneous connections. 194 limit_per_host - Number of simultaneous connections to one host. 195 enable_cleanup_closed - Enables clean-up closed ssl transports. 196 Disabled by default. 197 loop - Optional event loop. 198 """ 199 200 _closed = True # prevent AttributeError in __del__ if ctor was failed 201 _source_traceback = None 202 203 # abort transport after 2 seconds (cleanup broken connections) 204 _cleanup_closed_period = 2.0 205 206 def __init__( 207 self, 208 *, 209 keepalive_timeout: Union[object, None, float] = sentinel, 210 force_close: bool = False, 211 limit: int = 100, 212 limit_per_host: int = 0, 213 enable_cleanup_closed: bool = False, 214 loop: Optional[asyncio.AbstractEventLoop] = None, 215 ) -> None: 216 217 if force_close: 218 if keepalive_timeout is not None and keepalive_timeout is not sentinel: 219 raise ValueError( 220 "keepalive_timeout cannot " "be set if force_close is True" 221 ) 222 else: 223 if keepalive_timeout is sentinel: 224 keepalive_timeout = 15.0 225 226 loop = get_running_loop(loop) 227 228 self._closed = False 229 if loop.get_debug(): 230 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 231 232 self._conns = ( 233 {} 234 ) # type: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] 235 self._limit = limit 236 self._limit_per_host = limit_per_host 237 self._acquired = set() # type: Set[ResponseHandler] 238 self._acquired_per_host = defaultdict( 239 set 240 ) # type: DefaultDict[ConnectionKey, Set[ResponseHandler]] 241 self._keepalive_timeout = cast(float, keepalive_timeout) 242 self._force_close = force_close 243 244 # {host_key: FIFO list of waiters} 245 self._waiters = defaultdict(deque) # type: ignore 246 247 self._loop = loop 248 self._factory = functools.partial(ResponseHandler, loop=loop) 249 250 self.cookies = SimpleCookie() # type: SimpleCookie[str] 251 252 # start keep-alive connection cleanup task 253 self._cleanup_handle = None 254 255 # start cleanup closed transports task 256 self._cleanup_closed_handle = None 257 self._cleanup_closed_disabled = not enable_cleanup_closed 258 self._cleanup_closed_transports = [] # type: List[Optional[asyncio.Transport]] 259 self._cleanup_closed() 260 261 def __del__(self, _warnings: Any = warnings) -> None: 262 if self._closed: 263 return 264 if not self._conns: 265 return 266 267 conns = [repr(c) for c in self._conns.values()] 268 269 self._close() 270 271 if PY_36: 272 kwargs = {"source": self} 273 else: 274 kwargs = {} 275 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs) 276 context = { 277 "connector": self, 278 "connections": conns, 279 "message": "Unclosed connector", 280 } 281 if self._source_traceback is not None: 282 context["source_traceback"] = self._source_traceback 283 self._loop.call_exception_handler(context) 284 285 def __enter__(self) -> "BaseConnector": 286 warnings.warn( 287 '"witn Connector():" is deprecated, ' 288 'use "async with Connector():" instead', 289 DeprecationWarning, 290 ) 291 return self 292 293 def __exit__(self, *exc: Any) -> None: 294 self.close() 295 296 async def __aenter__(self) -> "BaseConnector": 297 return self 298 299 async def __aexit__( 300 self, 301 exc_type: Optional[Type[BaseException]] = None, 302 exc_value: Optional[BaseException] = None, 303 exc_traceback: Optional[TracebackType] = None, 304 ) -> None: 305 await self.close() 306 307 @property 308 def force_close(self) -> bool: 309 """Ultimately close connection on releasing if True.""" 310 return self._force_close 311 312 @property 313 def limit(self) -> int: 314 """The total number for simultaneous connections. 315 316 If limit is 0 the connector has no limit. 317 The default limit size is 100. 318 """ 319 return self._limit 320 321 @property 322 def limit_per_host(self) -> int: 323 """The limit_per_host for simultaneous connections 324 to the same endpoint. 325 326 Endpoints are the same if they are have equal 327 (host, port, is_ssl) triple. 328 329 """ 330 return self._limit_per_host 331 332 def _cleanup(self) -> None: 333 """Cleanup unused transports.""" 334 if self._cleanup_handle: 335 self._cleanup_handle.cancel() 336 # _cleanup_handle should be unset, otherwise _release() will not 337 # recreate it ever! 338 self._cleanup_handle = None 339 340 now = self._loop.time() 341 timeout = self._keepalive_timeout 342 343 if self._conns: 344 connections = {} 345 deadline = now - timeout 346 for key, conns in self._conns.items(): 347 alive = [] 348 for proto, use_time in conns: 349 if proto.is_connected(): 350 if use_time - deadline < 0: 351 transport = proto.transport 352 proto.close() 353 if key.is_ssl and not self._cleanup_closed_disabled: 354 self._cleanup_closed_transports.append(transport) 355 else: 356 alive.append((proto, use_time)) 357 else: 358 transport = proto.transport 359 proto.close() 360 if key.is_ssl and not self._cleanup_closed_disabled: 361 self._cleanup_closed_transports.append(transport) 362 363 if alive: 364 connections[key] = alive 365 366 self._conns = connections 367 368 if self._conns: 369 self._cleanup_handle = helpers.weakref_handle( 370 self, "_cleanup", timeout, self._loop 371 ) 372 373 def _drop_acquired_per_host( 374 self, key: "ConnectionKey", val: ResponseHandler 375 ) -> None: 376 acquired_per_host = self._acquired_per_host 377 if key not in acquired_per_host: 378 return 379 conns = acquired_per_host[key] 380 conns.remove(val) 381 if not conns: 382 del self._acquired_per_host[key] 383 384 def _cleanup_closed(self) -> None: 385 """Double confirmation for transport close. 386 Some broken ssl servers may leave socket open without proper close. 387 """ 388 if self._cleanup_closed_handle: 389 self._cleanup_closed_handle.cancel() 390 391 for transport in self._cleanup_closed_transports: 392 if transport is not None: 393 transport.abort() 394 395 self._cleanup_closed_transports = [] 396 397 if not self._cleanup_closed_disabled: 398 self._cleanup_closed_handle = helpers.weakref_handle( 399 self, "_cleanup_closed", self._cleanup_closed_period, self._loop 400 ) 401 402 def close(self) -> Awaitable[None]: 403 """Close all opened transports.""" 404 self._close() 405 return _DeprecationWaiter(noop()) 406 407 def _close(self) -> None: 408 if self._closed: 409 return 410 411 self._closed = True 412 413 try: 414 if self._loop.is_closed(): 415 return 416 417 # cancel cleanup task 418 if self._cleanup_handle: 419 self._cleanup_handle.cancel() 420 421 # cancel cleanup close task 422 if self._cleanup_closed_handle: 423 self._cleanup_closed_handle.cancel() 424 425 for data in self._conns.values(): 426 for proto, t0 in data: 427 proto.close() 428 429 for proto in self._acquired: 430 proto.close() 431 432 for transport in self._cleanup_closed_transports: 433 if transport is not None: 434 transport.abort() 435 436 finally: 437 self._conns.clear() 438 self._acquired.clear() 439 self._waiters.clear() 440 self._cleanup_handle = None 441 self._cleanup_closed_transports.clear() 442 self._cleanup_closed_handle = None 443 444 @property 445 def closed(self) -> bool: 446 """Is connector closed. 447 448 A readonly property. 449 """ 450 return self._closed 451 452 def _available_connections(self, key: "ConnectionKey") -> int: 453 """ 454 Return number of available connections taking into account 455 the limit, limit_per_host and the connection key. 456 457 If it returns less than 1 means that there is no connections 458 availables. 459 """ 460 461 if self._limit: 462 # total calc available connections 463 available = self._limit - len(self._acquired) 464 465 # check limit per host 466 if ( 467 self._limit_per_host 468 and available > 0 469 and key in self._acquired_per_host 470 ): 471 acquired = self._acquired_per_host.get(key) 472 assert acquired is not None 473 available = self._limit_per_host - len(acquired) 474 475 elif self._limit_per_host and key in self._acquired_per_host: 476 # check limit per host 477 acquired = self._acquired_per_host.get(key) 478 assert acquired is not None 479 available = self._limit_per_host - len(acquired) 480 else: 481 available = 1 482 483 return available 484 485 async def connect( 486 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 487 ) -> Connection: 488 """Get from pool or create new connection.""" 489 key = req.connection_key 490 available = self._available_connections(key) 491 492 # Wait if there are no available connections or if there are/were 493 # waiters (i.e. don't steal connection from a waiter about to wake up) 494 if available <= 0 or key in self._waiters: 495 fut = self._loop.create_future() 496 497 # This connection will now count towards the limit. 498 self._waiters[key].append(fut) 499 500 if traces: 501 for trace in traces: 502 await trace.send_connection_queued_start() 503 504 try: 505 await fut 506 except BaseException as e: 507 if key in self._waiters: 508 # remove a waiter even if it was cancelled, normally it's 509 # removed when it's notified 510 try: 511 self._waiters[key].remove(fut) 512 except ValueError: # fut may no longer be in list 513 pass 514 515 raise e 516 finally: 517 if key in self._waiters and not self._waiters[key]: 518 del self._waiters[key] 519 520 if traces: 521 for trace in traces: 522 await trace.send_connection_queued_end() 523 524 proto = self._get(key) 525 if proto is None: 526 placeholder = cast(ResponseHandler, _TransportPlaceholder()) 527 self._acquired.add(placeholder) 528 self._acquired_per_host[key].add(placeholder) 529 530 if traces: 531 for trace in traces: 532 await trace.send_connection_create_start() 533 534 try: 535 proto = await self._create_connection(req, traces, timeout) 536 if self._closed: 537 proto.close() 538 raise ClientConnectionError("Connector is closed.") 539 except BaseException: 540 if not self._closed: 541 self._acquired.remove(placeholder) 542 self._drop_acquired_per_host(key, placeholder) 543 self._release_waiter() 544 raise 545 else: 546 if not self._closed: 547 self._acquired.remove(placeholder) 548 self._drop_acquired_per_host(key, placeholder) 549 550 if traces: 551 for trace in traces: 552 await trace.send_connection_create_end() 553 else: 554 if traces: 555 for trace in traces: 556 await trace.send_connection_reuseconn() 557 558 self._acquired.add(proto) 559 self._acquired_per_host[key].add(proto) 560 return Connection(self, key, proto, self._loop) 561 562 def _get(self, key: "ConnectionKey") -> Optional[ResponseHandler]: 563 try: 564 conns = self._conns[key] 565 except KeyError: 566 return None 567 568 t1 = self._loop.time() 569 while conns: 570 proto, t0 = conns.pop() 571 if proto.is_connected(): 572 if t1 - t0 > self._keepalive_timeout: 573 transport = proto.transport 574 proto.close() 575 # only for SSL transports 576 if key.is_ssl and not self._cleanup_closed_disabled: 577 self._cleanup_closed_transports.append(transport) 578 else: 579 if not conns: 580 # The very last connection was reclaimed: drop the key 581 del self._conns[key] 582 return proto 583 else: 584 transport = proto.transport 585 proto.close() 586 if key.is_ssl and not self._cleanup_closed_disabled: 587 self._cleanup_closed_transports.append(transport) 588 589 # No more connections: drop the key 590 del self._conns[key] 591 return None 592 593 def _release_waiter(self) -> None: 594 """ 595 Iterates over all waiters till found one that is not finsihed and 596 belongs to a host that has available connections. 597 """ 598 if not self._waiters: 599 return 600 601 # Having the dict keys ordered this avoids to iterate 602 # at the same order at each call. 603 queues = list(self._waiters.keys()) 604 random.shuffle(queues) 605 606 for key in queues: 607 if self._available_connections(key) < 1: 608 continue 609 610 waiters = self._waiters[key] 611 while waiters: 612 waiter = waiters.popleft() 613 if not waiter.done(): 614 waiter.set_result(None) 615 return 616 617 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None: 618 if self._closed: 619 # acquired connection is already released on connector closing 620 return 621 622 try: 623 self._acquired.remove(proto) 624 self._drop_acquired_per_host(key, proto) 625 except KeyError: # pragma: no cover 626 # this may be result of undetermenistic order of objects 627 # finalization due garbage collection. 628 pass 629 else: 630 self._release_waiter() 631 632 def _release( 633 self, 634 key: "ConnectionKey", 635 protocol: ResponseHandler, 636 *, 637 should_close: bool = False, 638 ) -> None: 639 if self._closed: 640 # acquired connection is already released on connector closing 641 return 642 643 self._release_acquired(key, protocol) 644 645 if self._force_close: 646 should_close = True 647 648 if should_close or protocol.should_close: 649 transport = protocol.transport 650 protocol.close() 651 652 if key.is_ssl and not self._cleanup_closed_disabled: 653 self._cleanup_closed_transports.append(transport) 654 else: 655 conns = self._conns.get(key) 656 if conns is None: 657 conns = self._conns[key] = [] 658 conns.append((protocol, self._loop.time())) 659 660 if self._cleanup_handle is None: 661 self._cleanup_handle = helpers.weakref_handle( 662 self, "_cleanup", self._keepalive_timeout, self._loop 663 ) 664 665 async def _create_connection( 666 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 667 ) -> ResponseHandler: 668 raise NotImplementedError() 669 670 671class _DNSCacheTable: 672 def __init__(self, ttl: Optional[float] = None) -> None: 673 self._addrs_rr = ( 674 {} 675 ) # type: Dict[Tuple[str, int], Tuple[Iterator[Dict[str, Any]], int]] 676 self._timestamps = {} # type: Dict[Tuple[str, int], float] 677 self._ttl = ttl 678 679 def __contains__(self, host: object) -> bool: 680 return host in self._addrs_rr 681 682 def add(self, key: Tuple[str, int], addrs: List[Dict[str, Any]]) -> None: 683 self._addrs_rr[key] = (cycle(addrs), len(addrs)) 684 685 if self._ttl: 686 self._timestamps[key] = monotonic() 687 688 def remove(self, key: Tuple[str, int]) -> None: 689 self._addrs_rr.pop(key, None) 690 691 if self._ttl: 692 self._timestamps.pop(key, None) 693 694 def clear(self) -> None: 695 self._addrs_rr.clear() 696 self._timestamps.clear() 697 698 def next_addrs(self, key: Tuple[str, int]) -> List[Dict[str, Any]]: 699 loop, length = self._addrs_rr[key] 700 addrs = list(islice(loop, length)) 701 # Consume one more element to shift internal state of `cycle` 702 next(loop) 703 return addrs 704 705 def expired(self, key: Tuple[str, int]) -> bool: 706 if self._ttl is None: 707 return False 708 709 return self._timestamps[key] + self._ttl < monotonic() 710 711 712class TCPConnector(BaseConnector): 713 """TCP connector. 714 715 verify_ssl - Set to True to check ssl certifications. 716 fingerprint - Pass the binary sha256 717 digest of the expected certificate in DER format to verify 718 that the certificate the server presents matches. See also 719 https://en.wikipedia.org/wiki/Transport_Layer_Security#Certificate_pinning 720 resolver - Enable DNS lookups and use this 721 resolver 722 use_dns_cache - Use memory cache for DNS lookups. 723 ttl_dns_cache - Max seconds having cached a DNS entry, None forever. 724 family - socket address family 725 local_addr - local tuple of (host, port) to bind socket to 726 727 keepalive_timeout - (optional) Keep-alive timeout. 728 force_close - Set to True to force close and do reconnect 729 after each request (and between redirects). 730 limit - The total number of simultaneous connections. 731 limit_per_host - Number of simultaneous connections to one host. 732 enable_cleanup_closed - Enables clean-up closed ssl transports. 733 Disabled by default. 734 loop - Optional event loop. 735 """ 736 737 def __init__( 738 self, 739 *, 740 verify_ssl: bool = True, 741 fingerprint: Optional[bytes] = None, 742 use_dns_cache: bool = True, 743 ttl_dns_cache: Optional[int] = 10, 744 family: int = 0, 745 ssl_context: Optional[SSLContext] = None, 746 ssl: Union[None, bool, Fingerprint, SSLContext] = None, 747 local_addr: Optional[Tuple[str, int]] = None, 748 resolver: Optional[AbstractResolver] = None, 749 keepalive_timeout: Union[None, float, object] = sentinel, 750 force_close: bool = False, 751 limit: int = 100, 752 limit_per_host: int = 0, 753 enable_cleanup_closed: bool = False, 754 loop: Optional[asyncio.AbstractEventLoop] = None, 755 ): 756 super().__init__( 757 keepalive_timeout=keepalive_timeout, 758 force_close=force_close, 759 limit=limit, 760 limit_per_host=limit_per_host, 761 enable_cleanup_closed=enable_cleanup_closed, 762 loop=loop, 763 ) 764 765 self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) 766 if resolver is None: 767 resolver = DefaultResolver(loop=self._loop) 768 self._resolver = resolver 769 770 self._use_dns_cache = use_dns_cache 771 self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache) 772 self._throttle_dns_events = ( 773 {} 774 ) # type: Dict[Tuple[str, int], EventResultOrError] 775 self._family = family 776 self._local_addr = local_addr 777 778 def close(self) -> Awaitable[None]: 779 """Close all ongoing DNS calls.""" 780 for ev in self._throttle_dns_events.values(): 781 ev.cancel() 782 783 return super().close() 784 785 @property 786 def family(self) -> int: 787 """Socket family like AF_INET.""" 788 return self._family 789 790 @property 791 def use_dns_cache(self) -> bool: 792 """True if local DNS caching is enabled.""" 793 return self._use_dns_cache 794 795 def clear_dns_cache( 796 self, host: Optional[str] = None, port: Optional[int] = None 797 ) -> None: 798 """Remove specified host/port or clear all dns local cache.""" 799 if host is not None and port is not None: 800 self._cached_hosts.remove((host, port)) 801 elif host is not None or port is not None: 802 raise ValueError("either both host and port " "or none of them are allowed") 803 else: 804 self._cached_hosts.clear() 805 806 async def _resolve_host( 807 self, host: str, port: int, traces: Optional[List["Trace"]] = None 808 ) -> List[Dict[str, Any]]: 809 if is_ip_address(host): 810 return [ 811 { 812 "hostname": host, 813 "host": host, 814 "port": port, 815 "family": self._family, 816 "proto": 0, 817 "flags": 0, 818 } 819 ] 820 821 if not self._use_dns_cache: 822 823 if traces: 824 for trace in traces: 825 await trace.send_dns_resolvehost_start(host) 826 827 res = await self._resolver.resolve(host, port, family=self._family) 828 829 if traces: 830 for trace in traces: 831 await trace.send_dns_resolvehost_end(host) 832 833 return res 834 835 key = (host, port) 836 837 if (key in self._cached_hosts) and (not self._cached_hosts.expired(key)): 838 # get result early, before any await (#4014) 839 result = self._cached_hosts.next_addrs(key) 840 841 if traces: 842 for trace in traces: 843 await trace.send_dns_cache_hit(host) 844 return result 845 846 if key in self._throttle_dns_events: 847 # get event early, before any await (#4014) 848 event = self._throttle_dns_events[key] 849 if traces: 850 for trace in traces: 851 await trace.send_dns_cache_hit(host) 852 await event.wait() 853 else: 854 # update dict early, before any await (#4014) 855 self._throttle_dns_events[key] = EventResultOrError(self._loop) 856 if traces: 857 for trace in traces: 858 await trace.send_dns_cache_miss(host) 859 try: 860 861 if traces: 862 for trace in traces: 863 await trace.send_dns_resolvehost_start(host) 864 865 addrs = await self._resolver.resolve(host, port, family=self._family) 866 if traces: 867 for trace in traces: 868 await trace.send_dns_resolvehost_end(host) 869 870 self._cached_hosts.add(key, addrs) 871 self._throttle_dns_events[key].set() 872 except BaseException as e: 873 # any DNS exception, independently of the implementation 874 # is set for the waiters to raise the same exception. 875 self._throttle_dns_events[key].set(exc=e) 876 raise 877 finally: 878 self._throttle_dns_events.pop(key) 879 880 return self._cached_hosts.next_addrs(key) 881 882 async def _create_connection( 883 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 884 ) -> ResponseHandler: 885 """Create connection. 886 887 Has same keyword arguments as BaseEventLoop.create_connection. 888 """ 889 if req.proxy: 890 _, proto = await self._create_proxy_connection(req, traces, timeout) 891 else: 892 _, proto = await self._create_direct_connection(req, traces, timeout) 893 894 return proto 895 896 @staticmethod 897 @functools.lru_cache(None) 898 def _make_ssl_context(verified: bool) -> SSLContext: 899 if verified: 900 return ssl.create_default_context() 901 else: 902 sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 903 sslcontext.options |= ssl.OP_NO_SSLv2 904 sslcontext.options |= ssl.OP_NO_SSLv3 905 try: 906 sslcontext.options |= ssl.OP_NO_COMPRESSION 907 except AttributeError as attr_err: 908 warnings.warn( 909 "{!s}: The Python interpreter is compiled " 910 "against OpenSSL < 1.0.0. Ref: " 911 "https://docs.python.org/3/library/ssl.html" 912 "#ssl.OP_NO_COMPRESSION".format(attr_err), 913 ) 914 sslcontext.set_default_verify_paths() 915 return sslcontext 916 917 def _get_ssl_context(self, req: "ClientRequest") -> Optional[SSLContext]: 918 """Logic to get the correct SSL context 919 920 0. if req.ssl is false, return None 921 922 1. if ssl_context is specified in req, use it 923 2. if _ssl_context is specified in self, use it 924 3. otherwise: 925 1. if verify_ssl is not specified in req, use self.ssl_context 926 (will generate a default context according to self.verify_ssl) 927 2. if verify_ssl is True in req, generate a default SSL context 928 3. if verify_ssl is False in req, generate a SSL context that 929 won't verify 930 """ 931 if req.is_ssl(): 932 if ssl is None: # pragma: no cover 933 raise RuntimeError("SSL is not supported.") 934 sslcontext = req.ssl 935 if isinstance(sslcontext, ssl.SSLContext): 936 return sslcontext 937 if sslcontext is not None: 938 # not verified or fingerprinted 939 return self._make_ssl_context(False) 940 sslcontext = self._ssl 941 if isinstance(sslcontext, ssl.SSLContext): 942 return sslcontext 943 if sslcontext is not None: 944 # not verified or fingerprinted 945 return self._make_ssl_context(False) 946 return self._make_ssl_context(True) 947 else: 948 return None 949 950 def _get_fingerprint(self, req: "ClientRequest") -> Optional["Fingerprint"]: 951 ret = req.ssl 952 if isinstance(ret, Fingerprint): 953 return ret 954 ret = self._ssl 955 if isinstance(ret, Fingerprint): 956 return ret 957 return None 958 959 async def _wrap_create_connection( 960 self, 961 *args: Any, 962 req: "ClientRequest", 963 timeout: "ClientTimeout", 964 client_error: Type[Exception] = ClientConnectorError, 965 **kwargs: Any, 966 ) -> Tuple[asyncio.Transport, ResponseHandler]: 967 try: 968 with CeilTimeout(timeout.sock_connect): 969 return await self._loop.create_connection(*args, **kwargs) # type: ignore # noqa 970 except cert_errors as exc: 971 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 972 except ssl_errors as exc: 973 raise ClientConnectorSSLError(req.connection_key, exc) from exc 974 except OSError as exc: 975 raise client_error(req.connection_key, exc) from exc 976 977 async def _create_direct_connection( 978 self, 979 req: "ClientRequest", 980 traces: List["Trace"], 981 timeout: "ClientTimeout", 982 *, 983 client_error: Type[Exception] = ClientConnectorError, 984 ) -> Tuple[asyncio.Transport, ResponseHandler]: 985 sslcontext = self._get_ssl_context(req) 986 fingerprint = self._get_fingerprint(req) 987 988 host = req.url.raw_host 989 assert host is not None 990 port = req.port 991 assert port is not None 992 host_resolved = asyncio.ensure_future( 993 self._resolve_host(host, port, traces=traces), loop=self._loop 994 ) 995 try: 996 # Cancelling this lookup should not cancel the underlying lookup 997 # or else the cancel event will get broadcast to all the waiters 998 # across all connections. 999 hosts = await asyncio.shield(host_resolved) 1000 except asyncio.CancelledError: 1001 1002 def drop_exception(fut: "asyncio.Future[List[Dict[str, Any]]]") -> None: 1003 with suppress(Exception, asyncio.CancelledError): 1004 fut.result() 1005 1006 host_resolved.add_done_callback(drop_exception) 1007 raise 1008 except OSError as exc: 1009 # in case of proxy it is not ClientProxyConnectionError 1010 # it is problem of resolving proxy ip itself 1011 raise ClientConnectorError(req.connection_key, exc) from exc 1012 1013 last_exc = None # type: Optional[Exception] 1014 1015 for hinfo in hosts: 1016 host = hinfo["host"] 1017 port = hinfo["port"] 1018 1019 try: 1020 transp, proto = await self._wrap_create_connection( 1021 self._factory, 1022 host, 1023 port, 1024 timeout=timeout, 1025 ssl=sslcontext, 1026 family=hinfo["family"], 1027 proto=hinfo["proto"], 1028 flags=hinfo["flags"], 1029 server_hostname=hinfo["hostname"] if sslcontext else None, 1030 local_addr=self._local_addr, 1031 req=req, 1032 client_error=client_error, 1033 ) 1034 except ClientConnectorError as exc: 1035 last_exc = exc 1036 continue 1037 1038 if req.is_ssl() and fingerprint: 1039 try: 1040 fingerprint.check(transp) 1041 except ServerFingerprintMismatch as exc: 1042 transp.close() 1043 if not self._cleanup_closed_disabled: 1044 self._cleanup_closed_transports.append(transp) 1045 last_exc = exc 1046 continue 1047 1048 return transp, proto 1049 else: 1050 assert last_exc is not None 1051 raise last_exc 1052 1053 async def _create_proxy_connection( 1054 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 1055 ) -> Tuple[asyncio.Transport, ResponseHandler]: 1056 headers = {} # type: Dict[str, str] 1057 if req.proxy_headers is not None: 1058 headers = req.proxy_headers # type: ignore 1059 headers[hdrs.HOST] = req.headers[hdrs.HOST] 1060 1061 url = req.proxy 1062 assert url is not None 1063 proxy_req = ClientRequest( 1064 hdrs.METH_GET, 1065 url, 1066 headers=headers, 1067 auth=req.proxy_auth, 1068 loop=self._loop, 1069 ssl=req.ssl, 1070 ) 1071 1072 # create connection to proxy server 1073 transport, proto = await self._create_direct_connection( 1074 proxy_req, [], timeout, client_error=ClientProxyConnectionError 1075 ) 1076 1077 # Many HTTP proxies has buggy keepalive support. Let's not 1078 # reuse connection but close it after processing every 1079 # response. 1080 proto.force_close() 1081 1082 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None) 1083 if auth is not None: 1084 if not req.is_ssl(): 1085 req.headers[hdrs.PROXY_AUTHORIZATION] = auth 1086 else: 1087 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth 1088 1089 if req.is_ssl(): 1090 sslcontext = self._get_ssl_context(req) 1091 # For HTTPS requests over HTTP proxy 1092 # we must notify proxy to tunnel connection 1093 # so we send CONNECT command: 1094 # CONNECT www.python.org:443 HTTP/1.1 1095 # Host: www.python.org 1096 # 1097 # next we must do TLS handshake and so on 1098 # to do this we must wrap raw socket into secure one 1099 # asyncio handles this perfectly 1100 proxy_req.method = hdrs.METH_CONNECT 1101 proxy_req.url = req.url 1102 key = attr.evolve( 1103 req.connection_key, proxy=None, proxy_auth=None, proxy_headers_hash=None 1104 ) 1105 conn = Connection(self, key, proto, self._loop) 1106 proxy_resp = await proxy_req.send(conn) 1107 try: 1108 protocol = conn._protocol 1109 assert protocol is not None 1110 protocol.set_response_params() 1111 resp = await proxy_resp.start(conn) 1112 except BaseException: 1113 proxy_resp.close() 1114 conn.close() 1115 raise 1116 else: 1117 conn._protocol = None 1118 conn._transport = None 1119 try: 1120 if resp.status != 200: 1121 message = resp.reason 1122 if message is None: 1123 message = RESPONSES[resp.status][0] 1124 raise ClientHttpProxyError( 1125 proxy_resp.request_info, 1126 resp.history, 1127 status=resp.status, 1128 message=message, 1129 headers=resp.headers, 1130 ) 1131 rawsock = transport.get_extra_info("socket", default=None) 1132 if rawsock is None: 1133 raise RuntimeError("Transport does not expose socket instance") 1134 # Duplicate the socket, so now we can close proxy transport 1135 rawsock = rawsock.dup() 1136 finally: 1137 transport.close() 1138 1139 transport, proto = await self._wrap_create_connection( 1140 self._factory, 1141 timeout=timeout, 1142 ssl=sslcontext, 1143 sock=rawsock, 1144 server_hostname=req.host, 1145 req=req, 1146 ) 1147 finally: 1148 proxy_resp.close() 1149 1150 return transport, proto 1151 1152 1153class UnixConnector(BaseConnector): 1154 """Unix socket connector. 1155 1156 path - Unix socket path. 1157 keepalive_timeout - (optional) Keep-alive timeout. 1158 force_close - Set to True to force close and do reconnect 1159 after each request (and between redirects). 1160 limit - The total number of simultaneous connections. 1161 limit_per_host - Number of simultaneous connections to one host. 1162 loop - Optional event loop. 1163 """ 1164 1165 def __init__( 1166 self, 1167 path: str, 1168 force_close: bool = False, 1169 keepalive_timeout: Union[object, float, None] = sentinel, 1170 limit: int = 100, 1171 limit_per_host: int = 0, 1172 loop: Optional[asyncio.AbstractEventLoop] = None, 1173 ) -> None: 1174 super().__init__( 1175 force_close=force_close, 1176 keepalive_timeout=keepalive_timeout, 1177 limit=limit, 1178 limit_per_host=limit_per_host, 1179 loop=loop, 1180 ) 1181 self._path = path 1182 1183 @property 1184 def path(self) -> str: 1185 """Path to unix socket.""" 1186 return self._path 1187 1188 async def _create_connection( 1189 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 1190 ) -> ResponseHandler: 1191 try: 1192 with CeilTimeout(timeout.sock_connect): 1193 _, proto = await self._loop.create_unix_connection( 1194 self._factory, self._path 1195 ) 1196 except OSError as exc: 1197 raise ClientConnectorError(req.connection_key, exc) from exc 1198 1199 return cast(ResponseHandler, proto) 1200 1201 1202class NamedPipeConnector(BaseConnector): 1203 """Named pipe connector. 1204 1205 Only supported by the proactor event loop. 1206 See also: https://docs.python.org/3.7/library/asyncio-eventloop.html 1207 1208 path - Windows named pipe path. 1209 keepalive_timeout - (optional) Keep-alive timeout. 1210 force_close - Set to True to force close and do reconnect 1211 after each request (and between redirects). 1212 limit - The total number of simultaneous connections. 1213 limit_per_host - Number of simultaneous connections to one host. 1214 loop - Optional event loop. 1215 """ 1216 1217 def __init__( 1218 self, 1219 path: str, 1220 force_close: bool = False, 1221 keepalive_timeout: Union[object, float, None] = sentinel, 1222 limit: int = 100, 1223 limit_per_host: int = 0, 1224 loop: Optional[asyncio.AbstractEventLoop] = None, 1225 ) -> None: 1226 super().__init__( 1227 force_close=force_close, 1228 keepalive_timeout=keepalive_timeout, 1229 limit=limit, 1230 limit_per_host=limit_per_host, 1231 loop=loop, 1232 ) 1233 if not isinstance(self._loop, asyncio.ProactorEventLoop): # type: ignore 1234 raise RuntimeError( 1235 "Named Pipes only available in proactor " "loop under windows" 1236 ) 1237 self._path = path 1238 1239 @property 1240 def path(self) -> str: 1241 """Path to the named pipe.""" 1242 return self._path 1243 1244 async def _create_connection( 1245 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 1246 ) -> ResponseHandler: 1247 try: 1248 with CeilTimeout(timeout.sock_connect): 1249 _, proto = await self._loop.create_pipe_connection( # type: ignore 1250 self._factory, self._path 1251 ) 1252 # the drain is required so that the connection_made is called 1253 # and transport is set otherwise it is not set before the 1254 # `assert conn.transport is not None` 1255 # in client.py's _request method 1256 await asyncio.sleep(0) 1257 # other option is to manually set transport like 1258 # `proto.transport = trans` 1259 except OSError as exc: 1260 raise ClientConnectorError(req.connection_key, exc) from exc 1261 1262 return cast(ResponseHandler, proto) 1263