1import asyncio
2import functools
3import random
4import sys
5import traceback
6import warnings
7from collections import defaultdict, deque
8from contextlib import suppress
9from http.cookies import SimpleCookie
10from itertools import cycle, islice
11from time import monotonic
12from types import TracebackType
13from typing import (
14    TYPE_CHECKING,
15    Any,
16    Awaitable,
17    Callable,
18    DefaultDict,
19    Dict,
20    Iterator,
21    List,
22    Optional,
23    Set,
24    Tuple,
25    Type,
26    Union,
27    cast,
28)
29
30import attr
31
32from . import hdrs, helpers
33from .abc import AbstractResolver
34from .client_exceptions import (
35    ClientConnectionError,
36    ClientConnectorCertificateError,
37    ClientConnectorError,
38    ClientConnectorSSLError,
39    ClientHttpProxyError,
40    ClientProxyConnectionError,
41    ServerFingerprintMismatch,
42    cert_errors,
43    ssl_errors,
44)
45from .client_proto import ResponseHandler
46from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params
47from .helpers import PY_36, CeilTimeout, get_running_loop, is_ip_address, noop, sentinel
48from .http import RESPONSES
49from .locks import EventResultOrError
50from .resolver import DefaultResolver
51
52try:
53    import ssl
54
55    SSLContext = ssl.SSLContext
56except ImportError:  # pragma: no cover
57    ssl = None  # type: ignore
58    SSLContext = object  # type: ignore
59
60
61__all__ = ("BaseConnector", "TCPConnector", "UnixConnector", "NamedPipeConnector")
62
63
64if TYPE_CHECKING:  # pragma: no cover
65    from .client import ClientTimeout
66    from .client_reqrep import ConnectionKey
67    from .tracing import Trace
68
69
70class _DeprecationWaiter:
71    __slots__ = ("_awaitable", "_awaited")
72
73    def __init__(self, awaitable: Awaitable[Any]) -> None:
74        self._awaitable = awaitable
75        self._awaited = False
76
77    def __await__(self) -> Any:
78        self._awaited = True
79        return self._awaitable.__await__()
80
81    def __del__(self) -> None:
82        if not self._awaited:
83            warnings.warn(
84                "Connector.close() is a coroutine, "
85                "please use await connector.close()",
86                DeprecationWarning,
87            )
88
89
90class Connection:
91
92    _source_traceback = None
93    _transport = None
94
95    def __init__(
96        self,
97        connector: "BaseConnector",
98        key: "ConnectionKey",
99        protocol: ResponseHandler,
100        loop: asyncio.AbstractEventLoop,
101    ) -> None:
102        self._key = key
103        self._connector = connector
104        self._loop = loop
105        self._protocol = protocol  # type: Optional[ResponseHandler]
106        self._callbacks = []  # type: List[Callable[[], None]]
107
108        if loop.get_debug():
109            self._source_traceback = traceback.extract_stack(sys._getframe(1))
110
111    def __repr__(self) -> str:
112        return f"Connection<{self._key}>"
113
114    def __del__(self, _warnings: Any = warnings) -> None:
115        if self._protocol is not None:
116            if PY_36:
117                kwargs = {"source": self}
118            else:
119                kwargs = {}
120            _warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs)
121            if self._loop.is_closed():
122                return
123
124            self._connector._release(self._key, self._protocol, should_close=True)
125
126            context = {"client_connection": self, "message": "Unclosed connection"}
127            if self._source_traceback is not None:
128                context["source_traceback"] = self._source_traceback
129            self._loop.call_exception_handler(context)
130
131    @property
132    def loop(self) -> asyncio.AbstractEventLoop:
133        warnings.warn(
134            "connector.loop property is deprecated", DeprecationWarning, stacklevel=2
135        )
136        return self._loop
137
138    @property
139    def transport(self) -> Optional[asyncio.Transport]:
140        if self._protocol is None:
141            return None
142        return self._protocol.transport
143
144    @property
145    def protocol(self) -> Optional[ResponseHandler]:
146        return self._protocol
147
148    def add_callback(self, callback: Callable[[], None]) -> None:
149        if callback is not None:
150            self._callbacks.append(callback)
151
152    def _notify_release(self) -> None:
153        callbacks, self._callbacks = self._callbacks[:], []
154
155        for cb in callbacks:
156            with suppress(Exception):
157                cb()
158
159    def close(self) -> None:
160        self._notify_release()
161
162        if self._protocol is not None:
163            self._connector._release(self._key, self._protocol, should_close=True)
164            self._protocol = None
165
166    def release(self) -> None:
167        self._notify_release()
168
169        if self._protocol is not None:
170            self._connector._release(
171                self._key, self._protocol, should_close=self._protocol.should_close
172            )
173            self._protocol = None
174
175    @property
176    def closed(self) -> bool:
177        return self._protocol is None or not self._protocol.is_connected()
178
179
180class _TransportPlaceholder:
181    """ placeholder for BaseConnector.connect function """
182
183    def close(self) -> None:
184        pass
185
186
187class BaseConnector:
188    """Base connector class.
189
190    keepalive_timeout - (optional) Keep-alive timeout.
191    force_close - Set to True to force close and do reconnect
192        after each request (and between redirects).
193    limit - The total number of simultaneous connections.
194    limit_per_host - Number of simultaneous connections to one host.
195    enable_cleanup_closed - Enables clean-up closed ssl transports.
196                            Disabled by default.
197    loop - Optional event loop.
198    """
199
200    _closed = True  # prevent AttributeError in __del__ if ctor was failed
201    _source_traceback = None
202
203    # abort transport after 2 seconds (cleanup broken connections)
204    _cleanup_closed_period = 2.0
205
206    def __init__(
207        self,
208        *,
209        keepalive_timeout: Union[object, None, float] = sentinel,
210        force_close: bool = False,
211        limit: int = 100,
212        limit_per_host: int = 0,
213        enable_cleanup_closed: bool = False,
214        loop: Optional[asyncio.AbstractEventLoop] = None,
215    ) -> None:
216
217        if force_close:
218            if keepalive_timeout is not None and keepalive_timeout is not sentinel:
219                raise ValueError(
220                    "keepalive_timeout cannot " "be set if force_close is True"
221                )
222        else:
223            if keepalive_timeout is sentinel:
224                keepalive_timeout = 15.0
225
226        loop = get_running_loop(loop)
227
228        self._closed = False
229        if loop.get_debug():
230            self._source_traceback = traceback.extract_stack(sys._getframe(1))
231
232        self._conns = (
233            {}
234        )  # type: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]]
235        self._limit = limit
236        self._limit_per_host = limit_per_host
237        self._acquired = set()  # type: Set[ResponseHandler]
238        self._acquired_per_host = defaultdict(
239            set
240        )  # type: DefaultDict[ConnectionKey, Set[ResponseHandler]]
241        self._keepalive_timeout = cast(float, keepalive_timeout)
242        self._force_close = force_close
243
244        # {host_key: FIFO list of waiters}
245        self._waiters = defaultdict(deque)  # type: ignore
246
247        self._loop = loop
248        self._factory = functools.partial(ResponseHandler, loop=loop)
249
250        self.cookies = SimpleCookie()  # type: SimpleCookie[str]
251
252        # start keep-alive connection cleanup task
253        self._cleanup_handle = None
254
255        # start cleanup closed transports task
256        self._cleanup_closed_handle = None
257        self._cleanup_closed_disabled = not enable_cleanup_closed
258        self._cleanup_closed_transports = []  # type: List[Optional[asyncio.Transport]]
259        self._cleanup_closed()
260
261    def __del__(self, _warnings: Any = warnings) -> None:
262        if self._closed:
263            return
264        if not self._conns:
265            return
266
267        conns = [repr(c) for c in self._conns.values()]
268
269        self._close()
270
271        if PY_36:
272            kwargs = {"source": self}
273        else:
274            kwargs = {}
275        _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs)
276        context = {
277            "connector": self,
278            "connections": conns,
279            "message": "Unclosed connector",
280        }
281        if self._source_traceback is not None:
282            context["source_traceback"] = self._source_traceback
283        self._loop.call_exception_handler(context)
284
285    def __enter__(self) -> "BaseConnector":
286        warnings.warn(
287            '"witn Connector():" is deprecated, '
288            'use "async with Connector():" instead',
289            DeprecationWarning,
290        )
291        return self
292
293    def __exit__(self, *exc: Any) -> None:
294        self.close()
295
296    async def __aenter__(self) -> "BaseConnector":
297        return self
298
299    async def __aexit__(
300        self,
301        exc_type: Optional[Type[BaseException]] = None,
302        exc_value: Optional[BaseException] = None,
303        exc_traceback: Optional[TracebackType] = None,
304    ) -> None:
305        await self.close()
306
307    @property
308    def force_close(self) -> bool:
309        """Ultimately close connection on releasing if True."""
310        return self._force_close
311
312    @property
313    def limit(self) -> int:
314        """The total number for simultaneous connections.
315
316        If limit is 0 the connector has no limit.
317        The default limit size is 100.
318        """
319        return self._limit
320
321    @property
322    def limit_per_host(self) -> int:
323        """The limit_per_host for simultaneous connections
324        to the same endpoint.
325
326        Endpoints are the same if they are have equal
327        (host, port, is_ssl) triple.
328
329        """
330        return self._limit_per_host
331
332    def _cleanup(self) -> None:
333        """Cleanup unused transports."""
334        if self._cleanup_handle:
335            self._cleanup_handle.cancel()
336            # _cleanup_handle should be unset, otherwise _release() will not
337            # recreate it ever!
338            self._cleanup_handle = None
339
340        now = self._loop.time()
341        timeout = self._keepalive_timeout
342
343        if self._conns:
344            connections = {}
345            deadline = now - timeout
346            for key, conns in self._conns.items():
347                alive = []
348                for proto, use_time in conns:
349                    if proto.is_connected():
350                        if use_time - deadline < 0:
351                            transport = proto.transport
352                            proto.close()
353                            if key.is_ssl and not self._cleanup_closed_disabled:
354                                self._cleanup_closed_transports.append(transport)
355                        else:
356                            alive.append((proto, use_time))
357                    else:
358                        transport = proto.transport
359                        proto.close()
360                        if key.is_ssl and not self._cleanup_closed_disabled:
361                            self._cleanup_closed_transports.append(transport)
362
363                if alive:
364                    connections[key] = alive
365
366            self._conns = connections
367
368        if self._conns:
369            self._cleanup_handle = helpers.weakref_handle(
370                self, "_cleanup", timeout, self._loop
371            )
372
373    def _drop_acquired_per_host(
374        self, key: "ConnectionKey", val: ResponseHandler
375    ) -> None:
376        acquired_per_host = self._acquired_per_host
377        if key not in acquired_per_host:
378            return
379        conns = acquired_per_host[key]
380        conns.remove(val)
381        if not conns:
382            del self._acquired_per_host[key]
383
384    def _cleanup_closed(self) -> None:
385        """Double confirmation for transport close.
386        Some broken ssl servers may leave socket open without proper close.
387        """
388        if self._cleanup_closed_handle:
389            self._cleanup_closed_handle.cancel()
390
391        for transport in self._cleanup_closed_transports:
392            if transport is not None:
393                transport.abort()
394
395        self._cleanup_closed_transports = []
396
397        if not self._cleanup_closed_disabled:
398            self._cleanup_closed_handle = helpers.weakref_handle(
399                self, "_cleanup_closed", self._cleanup_closed_period, self._loop
400            )
401
402    def close(self) -> Awaitable[None]:
403        """Close all opened transports."""
404        self._close()
405        return _DeprecationWaiter(noop())
406
407    def _close(self) -> None:
408        if self._closed:
409            return
410
411        self._closed = True
412
413        try:
414            if self._loop.is_closed():
415                return
416
417            # cancel cleanup task
418            if self._cleanup_handle:
419                self._cleanup_handle.cancel()
420
421            # cancel cleanup close task
422            if self._cleanup_closed_handle:
423                self._cleanup_closed_handle.cancel()
424
425            for data in self._conns.values():
426                for proto, t0 in data:
427                    proto.close()
428
429            for proto in self._acquired:
430                proto.close()
431
432            for transport in self._cleanup_closed_transports:
433                if transport is not None:
434                    transport.abort()
435
436        finally:
437            self._conns.clear()
438            self._acquired.clear()
439            self._waiters.clear()
440            self._cleanup_handle = None
441            self._cleanup_closed_transports.clear()
442            self._cleanup_closed_handle = None
443
444    @property
445    def closed(self) -> bool:
446        """Is connector closed.
447
448        A readonly property.
449        """
450        return self._closed
451
452    def _available_connections(self, key: "ConnectionKey") -> int:
453        """
454        Return number of available connections taking into account
455        the limit, limit_per_host and the connection key.
456
457        If it returns less than 1 means that there is no connections
458        availables.
459        """
460
461        if self._limit:
462            # total calc available connections
463            available = self._limit - len(self._acquired)
464
465            # check limit per host
466            if (
467                self._limit_per_host
468                and available > 0
469                and key in self._acquired_per_host
470            ):
471                acquired = self._acquired_per_host.get(key)
472                assert acquired is not None
473                available = self._limit_per_host - len(acquired)
474
475        elif self._limit_per_host and key in self._acquired_per_host:
476            # check limit per host
477            acquired = self._acquired_per_host.get(key)
478            assert acquired is not None
479            available = self._limit_per_host - len(acquired)
480        else:
481            available = 1
482
483        return available
484
485    async def connect(
486        self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
487    ) -> Connection:
488        """Get from pool or create new connection."""
489        key = req.connection_key
490        available = self._available_connections(key)
491
492        # Wait if there are no available connections or if there are/were
493        # waiters (i.e. don't steal connection from a waiter about to wake up)
494        if available <= 0 or key in self._waiters:
495            fut = self._loop.create_future()
496
497            # This connection will now count towards the limit.
498            self._waiters[key].append(fut)
499
500            if traces:
501                for trace in traces:
502                    await trace.send_connection_queued_start()
503
504            try:
505                await fut
506            except BaseException as e:
507                if key in self._waiters:
508                    # remove a waiter even if it was cancelled, normally it's
509                    #  removed when it's notified
510                    try:
511                        self._waiters[key].remove(fut)
512                    except ValueError:  # fut may no longer be in list
513                        pass
514
515                raise e
516            finally:
517                if key in self._waiters and not self._waiters[key]:
518                    del self._waiters[key]
519
520            if traces:
521                for trace in traces:
522                    await trace.send_connection_queued_end()
523
524        proto = self._get(key)
525        if proto is None:
526            placeholder = cast(ResponseHandler, _TransportPlaceholder())
527            self._acquired.add(placeholder)
528            self._acquired_per_host[key].add(placeholder)
529
530            if traces:
531                for trace in traces:
532                    await trace.send_connection_create_start()
533
534            try:
535                proto = await self._create_connection(req, traces, timeout)
536                if self._closed:
537                    proto.close()
538                    raise ClientConnectionError("Connector is closed.")
539            except BaseException:
540                if not self._closed:
541                    self._acquired.remove(placeholder)
542                    self._drop_acquired_per_host(key, placeholder)
543                    self._release_waiter()
544                raise
545            else:
546                if not self._closed:
547                    self._acquired.remove(placeholder)
548                    self._drop_acquired_per_host(key, placeholder)
549
550            if traces:
551                for trace in traces:
552                    await trace.send_connection_create_end()
553        else:
554            if traces:
555                for trace in traces:
556                    await trace.send_connection_reuseconn()
557
558        self._acquired.add(proto)
559        self._acquired_per_host[key].add(proto)
560        return Connection(self, key, proto, self._loop)
561
562    def _get(self, key: "ConnectionKey") -> Optional[ResponseHandler]:
563        try:
564            conns = self._conns[key]
565        except KeyError:
566            return None
567
568        t1 = self._loop.time()
569        while conns:
570            proto, t0 = conns.pop()
571            if proto.is_connected():
572                if t1 - t0 > self._keepalive_timeout:
573                    transport = proto.transport
574                    proto.close()
575                    # only for SSL transports
576                    if key.is_ssl and not self._cleanup_closed_disabled:
577                        self._cleanup_closed_transports.append(transport)
578                else:
579                    if not conns:
580                        # The very last connection was reclaimed: drop the key
581                        del self._conns[key]
582                    return proto
583            else:
584                transport = proto.transport
585                proto.close()
586                if key.is_ssl and not self._cleanup_closed_disabled:
587                    self._cleanup_closed_transports.append(transport)
588
589        # No more connections: drop the key
590        del self._conns[key]
591        return None
592
593    def _release_waiter(self) -> None:
594        """
595        Iterates over all waiters till found one that is not finsihed and
596        belongs to a host that has available connections.
597        """
598        if not self._waiters:
599            return
600
601        # Having the dict keys ordered this avoids to iterate
602        # at the same order at each call.
603        queues = list(self._waiters.keys())
604        random.shuffle(queues)
605
606        for key in queues:
607            if self._available_connections(key) < 1:
608                continue
609
610            waiters = self._waiters[key]
611            while waiters:
612                waiter = waiters.popleft()
613                if not waiter.done():
614                    waiter.set_result(None)
615                    return
616
617    def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None:
618        if self._closed:
619            # acquired connection is already released on connector closing
620            return
621
622        try:
623            self._acquired.remove(proto)
624            self._drop_acquired_per_host(key, proto)
625        except KeyError:  # pragma: no cover
626            # this may be result of undetermenistic order of objects
627            # finalization due garbage collection.
628            pass
629        else:
630            self._release_waiter()
631
632    def _release(
633        self,
634        key: "ConnectionKey",
635        protocol: ResponseHandler,
636        *,
637        should_close: bool = False,
638    ) -> None:
639        if self._closed:
640            # acquired connection is already released on connector closing
641            return
642
643        self._release_acquired(key, protocol)
644
645        if self._force_close:
646            should_close = True
647
648        if should_close or protocol.should_close:
649            transport = protocol.transport
650            protocol.close()
651
652            if key.is_ssl and not self._cleanup_closed_disabled:
653                self._cleanup_closed_transports.append(transport)
654        else:
655            conns = self._conns.get(key)
656            if conns is None:
657                conns = self._conns[key] = []
658            conns.append((protocol, self._loop.time()))
659
660            if self._cleanup_handle is None:
661                self._cleanup_handle = helpers.weakref_handle(
662                    self, "_cleanup", self._keepalive_timeout, self._loop
663                )
664
665    async def _create_connection(
666        self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
667    ) -> ResponseHandler:
668        raise NotImplementedError()
669
670
671class _DNSCacheTable:
672    def __init__(self, ttl: Optional[float] = None) -> None:
673        self._addrs_rr = (
674            {}
675        )  # type: Dict[Tuple[str, int], Tuple[Iterator[Dict[str, Any]], int]]
676        self._timestamps = {}  # type: Dict[Tuple[str, int], float]
677        self._ttl = ttl
678
679    def __contains__(self, host: object) -> bool:
680        return host in self._addrs_rr
681
682    def add(self, key: Tuple[str, int], addrs: List[Dict[str, Any]]) -> None:
683        self._addrs_rr[key] = (cycle(addrs), len(addrs))
684
685        if self._ttl:
686            self._timestamps[key] = monotonic()
687
688    def remove(self, key: Tuple[str, int]) -> None:
689        self._addrs_rr.pop(key, None)
690
691        if self._ttl:
692            self._timestamps.pop(key, None)
693
694    def clear(self) -> None:
695        self._addrs_rr.clear()
696        self._timestamps.clear()
697
698    def next_addrs(self, key: Tuple[str, int]) -> List[Dict[str, Any]]:
699        loop, length = self._addrs_rr[key]
700        addrs = list(islice(loop, length))
701        # Consume one more element to shift internal state of `cycle`
702        next(loop)
703        return addrs
704
705    def expired(self, key: Tuple[str, int]) -> bool:
706        if self._ttl is None:
707            return False
708
709        return self._timestamps[key] + self._ttl < monotonic()
710
711
712class TCPConnector(BaseConnector):
713    """TCP connector.
714
715    verify_ssl - Set to True to check ssl certifications.
716    fingerprint - Pass the binary sha256
717        digest of the expected certificate in DER format to verify
718        that the certificate the server presents matches. See also
719        https://en.wikipedia.org/wiki/Transport_Layer_Security#Certificate_pinning
720    resolver - Enable DNS lookups and use this
721        resolver
722    use_dns_cache - Use memory cache for DNS lookups.
723    ttl_dns_cache - Max seconds having cached a DNS entry, None forever.
724    family - socket address family
725    local_addr - local tuple of (host, port) to bind socket to
726
727    keepalive_timeout - (optional) Keep-alive timeout.
728    force_close - Set to True to force close and do reconnect
729        after each request (and between redirects).
730    limit - The total number of simultaneous connections.
731    limit_per_host - Number of simultaneous connections to one host.
732    enable_cleanup_closed - Enables clean-up closed ssl transports.
733                            Disabled by default.
734    loop - Optional event loop.
735    """
736
737    def __init__(
738        self,
739        *,
740        verify_ssl: bool = True,
741        fingerprint: Optional[bytes] = None,
742        use_dns_cache: bool = True,
743        ttl_dns_cache: Optional[int] = 10,
744        family: int = 0,
745        ssl_context: Optional[SSLContext] = None,
746        ssl: Union[None, bool, Fingerprint, SSLContext] = None,
747        local_addr: Optional[Tuple[str, int]] = None,
748        resolver: Optional[AbstractResolver] = None,
749        keepalive_timeout: Union[None, float, object] = sentinel,
750        force_close: bool = False,
751        limit: int = 100,
752        limit_per_host: int = 0,
753        enable_cleanup_closed: bool = False,
754        loop: Optional[asyncio.AbstractEventLoop] = None,
755    ):
756        super().__init__(
757            keepalive_timeout=keepalive_timeout,
758            force_close=force_close,
759            limit=limit,
760            limit_per_host=limit_per_host,
761            enable_cleanup_closed=enable_cleanup_closed,
762            loop=loop,
763        )
764
765        self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
766        if resolver is None:
767            resolver = DefaultResolver(loop=self._loop)
768        self._resolver = resolver
769
770        self._use_dns_cache = use_dns_cache
771        self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache)
772        self._throttle_dns_events = (
773            {}
774        )  # type: Dict[Tuple[str, int], EventResultOrError]
775        self._family = family
776        self._local_addr = local_addr
777
778    def close(self) -> Awaitable[None]:
779        """Close all ongoing DNS calls."""
780        for ev in self._throttle_dns_events.values():
781            ev.cancel()
782
783        return super().close()
784
785    @property
786    def family(self) -> int:
787        """Socket family like AF_INET."""
788        return self._family
789
790    @property
791    def use_dns_cache(self) -> bool:
792        """True if local DNS caching is enabled."""
793        return self._use_dns_cache
794
795    def clear_dns_cache(
796        self, host: Optional[str] = None, port: Optional[int] = None
797    ) -> None:
798        """Remove specified host/port or clear all dns local cache."""
799        if host is not None and port is not None:
800            self._cached_hosts.remove((host, port))
801        elif host is not None or port is not None:
802            raise ValueError("either both host and port " "or none of them are allowed")
803        else:
804            self._cached_hosts.clear()
805
806    async def _resolve_host(
807        self, host: str, port: int, traces: Optional[List["Trace"]] = None
808    ) -> List[Dict[str, Any]]:
809        if is_ip_address(host):
810            return [
811                {
812                    "hostname": host,
813                    "host": host,
814                    "port": port,
815                    "family": self._family,
816                    "proto": 0,
817                    "flags": 0,
818                }
819            ]
820
821        if not self._use_dns_cache:
822
823            if traces:
824                for trace in traces:
825                    await trace.send_dns_resolvehost_start(host)
826
827            res = await self._resolver.resolve(host, port, family=self._family)
828
829            if traces:
830                for trace in traces:
831                    await trace.send_dns_resolvehost_end(host)
832
833            return res
834
835        key = (host, port)
836
837        if (key in self._cached_hosts) and (not self._cached_hosts.expired(key)):
838            # get result early, before any await (#4014)
839            result = self._cached_hosts.next_addrs(key)
840
841            if traces:
842                for trace in traces:
843                    await trace.send_dns_cache_hit(host)
844            return result
845
846        if key in self._throttle_dns_events:
847            # get event early, before any await (#4014)
848            event = self._throttle_dns_events[key]
849            if traces:
850                for trace in traces:
851                    await trace.send_dns_cache_hit(host)
852            await event.wait()
853        else:
854            # update dict early, before any await (#4014)
855            self._throttle_dns_events[key] = EventResultOrError(self._loop)
856            if traces:
857                for trace in traces:
858                    await trace.send_dns_cache_miss(host)
859            try:
860
861                if traces:
862                    for trace in traces:
863                        await trace.send_dns_resolvehost_start(host)
864
865                addrs = await self._resolver.resolve(host, port, family=self._family)
866                if traces:
867                    for trace in traces:
868                        await trace.send_dns_resolvehost_end(host)
869
870                self._cached_hosts.add(key, addrs)
871                self._throttle_dns_events[key].set()
872            except BaseException as e:
873                # any DNS exception, independently of the implementation
874                # is set for the waiters to raise the same exception.
875                self._throttle_dns_events[key].set(exc=e)
876                raise
877            finally:
878                self._throttle_dns_events.pop(key)
879
880        return self._cached_hosts.next_addrs(key)
881
882    async def _create_connection(
883        self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
884    ) -> ResponseHandler:
885        """Create connection.
886
887        Has same keyword arguments as BaseEventLoop.create_connection.
888        """
889        if req.proxy:
890            _, proto = await self._create_proxy_connection(req, traces, timeout)
891        else:
892            _, proto = await self._create_direct_connection(req, traces, timeout)
893
894        return proto
895
896    @staticmethod
897    @functools.lru_cache(None)
898    def _make_ssl_context(verified: bool) -> SSLContext:
899        if verified:
900            return ssl.create_default_context()
901        else:
902            sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
903            sslcontext.options |= ssl.OP_NO_SSLv2
904            sslcontext.options |= ssl.OP_NO_SSLv3
905            try:
906                sslcontext.options |= ssl.OP_NO_COMPRESSION
907            except AttributeError as attr_err:
908                warnings.warn(
909                    "{!s}: The Python interpreter is compiled "
910                    "against OpenSSL < 1.0.0. Ref: "
911                    "https://docs.python.org/3/library/ssl.html"
912                    "#ssl.OP_NO_COMPRESSION".format(attr_err),
913                )
914            sslcontext.set_default_verify_paths()
915            return sslcontext
916
917    def _get_ssl_context(self, req: "ClientRequest") -> Optional[SSLContext]:
918        """Logic to get the correct SSL context
919
920        0. if req.ssl is false, return None
921
922        1. if ssl_context is specified in req, use it
923        2. if _ssl_context is specified in self, use it
924        3. otherwise:
925            1. if verify_ssl is not specified in req, use self.ssl_context
926               (will generate a default context according to self.verify_ssl)
927            2. if verify_ssl is True in req, generate a default SSL context
928            3. if verify_ssl is False in req, generate a SSL context that
929               won't verify
930        """
931        if req.is_ssl():
932            if ssl is None:  # pragma: no cover
933                raise RuntimeError("SSL is not supported.")
934            sslcontext = req.ssl
935            if isinstance(sslcontext, ssl.SSLContext):
936                return sslcontext
937            if sslcontext is not None:
938                # not verified or fingerprinted
939                return self._make_ssl_context(False)
940            sslcontext = self._ssl
941            if isinstance(sslcontext, ssl.SSLContext):
942                return sslcontext
943            if sslcontext is not None:
944                # not verified or fingerprinted
945                return self._make_ssl_context(False)
946            return self._make_ssl_context(True)
947        else:
948            return None
949
950    def _get_fingerprint(self, req: "ClientRequest") -> Optional["Fingerprint"]:
951        ret = req.ssl
952        if isinstance(ret, Fingerprint):
953            return ret
954        ret = self._ssl
955        if isinstance(ret, Fingerprint):
956            return ret
957        return None
958
959    async def _wrap_create_connection(
960        self,
961        *args: Any,
962        req: "ClientRequest",
963        timeout: "ClientTimeout",
964        client_error: Type[Exception] = ClientConnectorError,
965        **kwargs: Any,
966    ) -> Tuple[asyncio.Transport, ResponseHandler]:
967        try:
968            with CeilTimeout(timeout.sock_connect):
969                return await self._loop.create_connection(*args, **kwargs)  # type: ignore  # noqa
970        except cert_errors as exc:
971            raise ClientConnectorCertificateError(req.connection_key, exc) from exc
972        except ssl_errors as exc:
973            raise ClientConnectorSSLError(req.connection_key, exc) from exc
974        except OSError as exc:
975            raise client_error(req.connection_key, exc) from exc
976
977    async def _create_direct_connection(
978        self,
979        req: "ClientRequest",
980        traces: List["Trace"],
981        timeout: "ClientTimeout",
982        *,
983        client_error: Type[Exception] = ClientConnectorError,
984    ) -> Tuple[asyncio.Transport, ResponseHandler]:
985        sslcontext = self._get_ssl_context(req)
986        fingerprint = self._get_fingerprint(req)
987
988        host = req.url.raw_host
989        assert host is not None
990        port = req.port
991        assert port is not None
992        host_resolved = asyncio.ensure_future(
993            self._resolve_host(host, port, traces=traces), loop=self._loop
994        )
995        try:
996            # Cancelling this lookup should not cancel the underlying lookup
997            #  or else the cancel event will get broadcast to all the waiters
998            #  across all connections.
999            hosts = await asyncio.shield(host_resolved)
1000        except asyncio.CancelledError:
1001
1002            def drop_exception(fut: "asyncio.Future[List[Dict[str, Any]]]") -> None:
1003                with suppress(Exception, asyncio.CancelledError):
1004                    fut.result()
1005
1006            host_resolved.add_done_callback(drop_exception)
1007            raise
1008        except OSError as exc:
1009            # in case of proxy it is not ClientProxyConnectionError
1010            # it is problem of resolving proxy ip itself
1011            raise ClientConnectorError(req.connection_key, exc) from exc
1012
1013        last_exc = None  # type: Optional[Exception]
1014
1015        for hinfo in hosts:
1016            host = hinfo["host"]
1017            port = hinfo["port"]
1018
1019            try:
1020                transp, proto = await self._wrap_create_connection(
1021                    self._factory,
1022                    host,
1023                    port,
1024                    timeout=timeout,
1025                    ssl=sslcontext,
1026                    family=hinfo["family"],
1027                    proto=hinfo["proto"],
1028                    flags=hinfo["flags"],
1029                    server_hostname=hinfo["hostname"] if sslcontext else None,
1030                    local_addr=self._local_addr,
1031                    req=req,
1032                    client_error=client_error,
1033                )
1034            except ClientConnectorError as exc:
1035                last_exc = exc
1036                continue
1037
1038            if req.is_ssl() and fingerprint:
1039                try:
1040                    fingerprint.check(transp)
1041                except ServerFingerprintMismatch as exc:
1042                    transp.close()
1043                    if not self._cleanup_closed_disabled:
1044                        self._cleanup_closed_transports.append(transp)
1045                    last_exc = exc
1046                    continue
1047
1048            return transp, proto
1049        else:
1050            assert last_exc is not None
1051            raise last_exc
1052
1053    async def _create_proxy_connection(
1054        self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
1055    ) -> Tuple[asyncio.Transport, ResponseHandler]:
1056        headers = {}  # type: Dict[str, str]
1057        if req.proxy_headers is not None:
1058            headers = req.proxy_headers  # type: ignore
1059        headers[hdrs.HOST] = req.headers[hdrs.HOST]
1060
1061        url = req.proxy
1062        assert url is not None
1063        proxy_req = ClientRequest(
1064            hdrs.METH_GET,
1065            url,
1066            headers=headers,
1067            auth=req.proxy_auth,
1068            loop=self._loop,
1069            ssl=req.ssl,
1070        )
1071
1072        # create connection to proxy server
1073        transport, proto = await self._create_direct_connection(
1074            proxy_req, [], timeout, client_error=ClientProxyConnectionError
1075        )
1076
1077        # Many HTTP proxies has buggy keepalive support.  Let's not
1078        # reuse connection but close it after processing every
1079        # response.
1080        proto.force_close()
1081
1082        auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None)
1083        if auth is not None:
1084            if not req.is_ssl():
1085                req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1086            else:
1087                proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1088
1089        if req.is_ssl():
1090            sslcontext = self._get_ssl_context(req)
1091            # For HTTPS requests over HTTP proxy
1092            # we must notify proxy to tunnel connection
1093            # so we send CONNECT command:
1094            #   CONNECT www.python.org:443 HTTP/1.1
1095            #   Host: www.python.org
1096            #
1097            # next we must do TLS handshake and so on
1098            # to do this we must wrap raw socket into secure one
1099            # asyncio handles this perfectly
1100            proxy_req.method = hdrs.METH_CONNECT
1101            proxy_req.url = req.url
1102            key = attr.evolve(
1103                req.connection_key, proxy=None, proxy_auth=None, proxy_headers_hash=None
1104            )
1105            conn = Connection(self, key, proto, self._loop)
1106            proxy_resp = await proxy_req.send(conn)
1107            try:
1108                protocol = conn._protocol
1109                assert protocol is not None
1110                protocol.set_response_params()
1111                resp = await proxy_resp.start(conn)
1112            except BaseException:
1113                proxy_resp.close()
1114                conn.close()
1115                raise
1116            else:
1117                conn._protocol = None
1118                conn._transport = None
1119                try:
1120                    if resp.status != 200:
1121                        message = resp.reason
1122                        if message is None:
1123                            message = RESPONSES[resp.status][0]
1124                        raise ClientHttpProxyError(
1125                            proxy_resp.request_info,
1126                            resp.history,
1127                            status=resp.status,
1128                            message=message,
1129                            headers=resp.headers,
1130                        )
1131                    rawsock = transport.get_extra_info("socket", default=None)
1132                    if rawsock is None:
1133                        raise RuntimeError("Transport does not expose socket instance")
1134                    # Duplicate the socket, so now we can close proxy transport
1135                    rawsock = rawsock.dup()
1136                finally:
1137                    transport.close()
1138
1139                transport, proto = await self._wrap_create_connection(
1140                    self._factory,
1141                    timeout=timeout,
1142                    ssl=sslcontext,
1143                    sock=rawsock,
1144                    server_hostname=req.host,
1145                    req=req,
1146                )
1147            finally:
1148                proxy_resp.close()
1149
1150        return transport, proto
1151
1152
1153class UnixConnector(BaseConnector):
1154    """Unix socket connector.
1155
1156    path - Unix socket path.
1157    keepalive_timeout - (optional) Keep-alive timeout.
1158    force_close - Set to True to force close and do reconnect
1159        after each request (and between redirects).
1160    limit - The total number of simultaneous connections.
1161    limit_per_host - Number of simultaneous connections to one host.
1162    loop - Optional event loop.
1163    """
1164
1165    def __init__(
1166        self,
1167        path: str,
1168        force_close: bool = False,
1169        keepalive_timeout: Union[object, float, None] = sentinel,
1170        limit: int = 100,
1171        limit_per_host: int = 0,
1172        loop: Optional[asyncio.AbstractEventLoop] = None,
1173    ) -> None:
1174        super().__init__(
1175            force_close=force_close,
1176            keepalive_timeout=keepalive_timeout,
1177            limit=limit,
1178            limit_per_host=limit_per_host,
1179            loop=loop,
1180        )
1181        self._path = path
1182
1183    @property
1184    def path(self) -> str:
1185        """Path to unix socket."""
1186        return self._path
1187
1188    async def _create_connection(
1189        self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
1190    ) -> ResponseHandler:
1191        try:
1192            with CeilTimeout(timeout.sock_connect):
1193                _, proto = await self._loop.create_unix_connection(
1194                    self._factory, self._path
1195                )
1196        except OSError as exc:
1197            raise ClientConnectorError(req.connection_key, exc) from exc
1198
1199        return cast(ResponseHandler, proto)
1200
1201
1202class NamedPipeConnector(BaseConnector):
1203    """Named pipe connector.
1204
1205    Only supported by the proactor event loop.
1206    See also: https://docs.python.org/3.7/library/asyncio-eventloop.html
1207
1208    path - Windows named pipe path.
1209    keepalive_timeout - (optional) Keep-alive timeout.
1210    force_close - Set to True to force close and do reconnect
1211        after each request (and between redirects).
1212    limit - The total number of simultaneous connections.
1213    limit_per_host - Number of simultaneous connections to one host.
1214    loop - Optional event loop.
1215    """
1216
1217    def __init__(
1218        self,
1219        path: str,
1220        force_close: bool = False,
1221        keepalive_timeout: Union[object, float, None] = sentinel,
1222        limit: int = 100,
1223        limit_per_host: int = 0,
1224        loop: Optional[asyncio.AbstractEventLoop] = None,
1225    ) -> None:
1226        super().__init__(
1227            force_close=force_close,
1228            keepalive_timeout=keepalive_timeout,
1229            limit=limit,
1230            limit_per_host=limit_per_host,
1231            loop=loop,
1232        )
1233        if not isinstance(self._loop, asyncio.ProactorEventLoop):  # type: ignore
1234            raise RuntimeError(
1235                "Named Pipes only available in proactor " "loop under windows"
1236            )
1237        self._path = path
1238
1239    @property
1240    def path(self) -> str:
1241        """Path to the named pipe."""
1242        return self._path
1243
1244    async def _create_connection(
1245        self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
1246    ) -> ResponseHandler:
1247        try:
1248            with CeilTimeout(timeout.sock_connect):
1249                _, proto = await self._loop.create_pipe_connection(  # type: ignore
1250                    self._factory, self._path
1251                )
1252                # the drain is required so that the connection_made is called
1253                # and transport is set otherwise it is not set before the
1254                # `assert conn.transport is not None`
1255                # in client.py's _request method
1256                await asyncio.sleep(0)
1257                # other option is to manually set transport like
1258                # `proto.transport = trans`
1259        except OSError as exc:
1260            raise ClientConnectorError(req.connection_key, exc) from exc
1261
1262        return cast(ResponseHandler, proto)
1263