1"""PipSession and supporting code, containing all pip-specific
2network request configuration and behavior.
3"""
4
5# The following comment should be removed at some point in the future.
6# mypy: disallow-untyped-defs=False
7
8import email.utils
9import json
10import logging
11import mimetypes
12import os
13import platform
14import sys
15import warnings
16
17from pip._vendor import requests, six, urllib3
18from pip._vendor.cachecontrol import CacheControlAdapter
19from pip._vendor.requests.adapters import BaseAdapter, HTTPAdapter
20from pip._vendor.requests.models import Response
21from pip._vendor.requests.structures import CaseInsensitiveDict
22from pip._vendor.six.moves.urllib import parse as urllib_parse
23from pip._vendor.urllib3.exceptions import InsecureRequestWarning
24
25from pip import __version__
26from pip._internal.network.auth import MultiDomainBasicAuth
27from pip._internal.network.cache import SafeFileCache
28
29# Import ssl from compat so the initial import occurs in only one place.
30from pip._internal.utils.compat import has_tls, ipaddress
31from pip._internal.utils.glibc import libc_ver
32from pip._internal.utils.misc import (
33    build_url_from_netloc,
34    get_installed_version,
35    parse_netloc,
36)
37from pip._internal.utils.typing import MYPY_CHECK_RUNNING
38from pip._internal.utils.urls import url_to_path
39
40if MYPY_CHECK_RUNNING:
41    from typing import Iterator, List, Optional, Tuple, Union
42
43    from pip._internal.models.link import Link
44
45    SecureOrigin = Tuple[str, str, Optional[Union[int, str]]]
46
47
48logger = logging.getLogger(__name__)
49
50
51# Ignore warning raised when using --trusted-host.
52warnings.filterwarnings("ignore", category=InsecureRequestWarning)
53
54
55SECURE_ORIGINS = [
56    # protocol, hostname, port
57    # Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)
58    ("https", "*", "*"),
59    ("*", "localhost", "*"),
60    ("*", "127.0.0.0/8", "*"),
61    ("*", "::1/128", "*"),
62    ("file", "*", None),
63    # ssh is always secure.
64    ("ssh", "*", "*"),
65]  # type: List[SecureOrigin]
66
67
68# These are environment variables present when running under various
69# CI systems.  For each variable, some CI systems that use the variable
70# are indicated.  The collection was chosen so that for each of a number
71# of popular systems, at least one of the environment variables is used.
72# This list is used to provide some indication of and lower bound for
73# CI traffic to PyPI.  Thus, it is okay if the list is not comprehensive.
74# For more background, see: https://github.com/pypa/pip/issues/5499
75CI_ENVIRONMENT_VARIABLES = (
76    # Azure Pipelines
77    'BUILD_BUILDID',
78    # Jenkins
79    'BUILD_ID',
80    # AppVeyor, CircleCI, Codeship, Gitlab CI, Shippable, Travis CI
81    'CI',
82    # Explicit environment variable.
83    'PIP_IS_CI',
84)
85
86
87def looks_like_ci():
88    # type: () -> bool
89    """
90    Return whether it looks like pip is running under CI.
91    """
92    # We don't use the method of checking for a tty (e.g. using isatty())
93    # because some CI systems mimic a tty (e.g. Travis CI).  Thus that
94    # method doesn't provide definitive information in either direction.
95    return any(name in os.environ for name in CI_ENVIRONMENT_VARIABLES)
96
97
98def user_agent():
99    """
100    Return a string representing the user agent.
101    """
102    data = {
103        "installer": {"name": "pip", "version": __version__},
104        "python": platform.python_version(),
105        "implementation": {
106            "name": platform.python_implementation(),
107        },
108    }
109
110    if data["implementation"]["name"] == 'CPython':
111        data["implementation"]["version"] = platform.python_version()
112    elif data["implementation"]["name"] == 'PyPy':
113        if sys.pypy_version_info.releaselevel == 'final':
114            pypy_version_info = sys.pypy_version_info[:3]
115        else:
116            pypy_version_info = sys.pypy_version_info
117        data["implementation"]["version"] = ".".join(
118            [str(x) for x in pypy_version_info]
119        )
120    elif data["implementation"]["name"] == 'Jython':
121        # Complete Guess
122        data["implementation"]["version"] = platform.python_version()
123    elif data["implementation"]["name"] == 'IronPython':
124        # Complete Guess
125        data["implementation"]["version"] = platform.python_version()
126
127    if sys.platform.startswith("linux"):
128        from pip._vendor import distro
129        distro_infos = dict(filter(
130            lambda x: x[1],
131            zip(["name", "version", "id"], distro.linux_distribution()),
132        ))
133        libc = dict(filter(
134            lambda x: x[1],
135            zip(["lib", "version"], libc_ver()),
136        ))
137        if libc:
138            distro_infos["libc"] = libc
139        if distro_infos:
140            data["distro"] = distro_infos
141
142    if sys.platform.startswith("darwin") and platform.mac_ver()[0]:
143        data["distro"] = {"name": "macOS", "version": platform.mac_ver()[0]}
144
145    if platform.system():
146        data.setdefault("system", {})["name"] = platform.system()
147
148    if platform.release():
149        data.setdefault("system", {})["release"] = platform.release()
150
151    if platform.machine():
152        data["cpu"] = platform.machine()
153
154    if has_tls():
155        import _ssl as ssl
156        data["openssl_version"] = ssl.OPENSSL_VERSION
157
158    setuptools_version = get_installed_version("setuptools")
159    if setuptools_version is not None:
160        data["setuptools_version"] = setuptools_version
161
162    # Use None rather than False so as not to give the impression that
163    # pip knows it is not being run under CI.  Rather, it is a null or
164    # inconclusive result.  Also, we include some value rather than no
165    # value to make it easier to know that the check has been run.
166    data["ci"] = True if looks_like_ci() else None
167
168    user_data = os.environ.get("PIP_USER_AGENT_USER_DATA")
169    if user_data is not None:
170        data["user_data"] = user_data
171
172    return "{data[installer][name]}/{data[installer][version]} {json}".format(
173        data=data,
174        json=json.dumps(data, separators=(",", ":"), sort_keys=True),
175    )
176
177
178class LocalFSAdapter(BaseAdapter):
179
180    def send(self, request, stream=None, timeout=None, verify=None, cert=None,
181             proxies=None):
182        pathname = url_to_path(request.url)
183
184        resp = Response()
185        resp.status_code = 200
186        resp.url = request.url
187
188        try:
189            stats = os.stat(pathname)
190        except OSError as exc:
191            resp.status_code = 404
192            resp.raw = exc
193        else:
194            modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
195            content_type = mimetypes.guess_type(pathname)[0] or "text/plain"
196            resp.headers = CaseInsensitiveDict({
197                "Content-Type": content_type,
198                "Content-Length": stats.st_size,
199                "Last-Modified": modified,
200            })
201
202            resp.raw = open(pathname, "rb")
203            resp.close = resp.raw.close
204
205        return resp
206
207    def close(self):
208        pass
209
210
211class InsecureHTTPAdapter(HTTPAdapter):
212
213    def cert_verify(self, conn, url, verify, cert):
214        super(InsecureHTTPAdapter, self).cert_verify(
215            conn=conn, url=url, verify=False, cert=cert
216        )
217
218
219class InsecureCacheControlAdapter(CacheControlAdapter):
220
221    def cert_verify(self, conn, url, verify, cert):
222        super(InsecureCacheControlAdapter, self).cert_verify(
223            conn=conn, url=url, verify=False, cert=cert
224        )
225
226
227class PipSession(requests.Session):
228
229    timeout = None  # type: Optional[int]
230
231    def __init__(self, *args, **kwargs):
232        """
233        :param trusted_hosts: Domains not to emit warnings for when not using
234            HTTPS.
235        """
236        retries = kwargs.pop("retries", 0)
237        cache = kwargs.pop("cache", None)
238        trusted_hosts = kwargs.pop("trusted_hosts", [])  # type: List[str]
239        index_urls = kwargs.pop("index_urls", None)
240
241        super(PipSession, self).__init__(*args, **kwargs)
242
243        # Namespace the attribute with "pip_" just in case to prevent
244        # possible conflicts with the base class.
245        self.pip_trusted_origins = []  # type: List[Tuple[str, Optional[int]]]
246
247        # Attach our User Agent to the request
248        self.headers["User-Agent"] = user_agent()
249
250        # Attach our Authentication handler to the session
251        self.auth = MultiDomainBasicAuth(index_urls=index_urls)
252
253        # Create our urllib3.Retry instance which will allow us to customize
254        # how we handle retries.
255        retries = urllib3.Retry(
256            # Set the total number of retries that a particular request can
257            # have.
258            total=retries,
259
260            # A 503 error from PyPI typically means that the Fastly -> Origin
261            # connection got interrupted in some way. A 503 error in general
262            # is typically considered a transient error so we'll go ahead and
263            # retry it.
264            # A 500 may indicate transient error in Amazon S3
265            # A 520 or 527 - may indicate transient error in CloudFlare
266            status_forcelist=[500, 503, 520, 527],
267
268            # Add a small amount of back off between failed requests in
269            # order to prevent hammering the service.
270            backoff_factor=0.25,
271        )
272
273        # Our Insecure HTTPAdapter disables HTTPS validation. It does not
274        # support caching so we'll use it for all http:// URLs.
275        # If caching is disabled, we will also use it for
276        # https:// hosts that we've marked as ignoring
277        # TLS errors for (trusted-hosts).
278        insecure_adapter = InsecureHTTPAdapter(max_retries=retries)
279
280        # We want to _only_ cache responses on securely fetched origins or when
281        # the host is specified as trusted. We do this because
282        # we can't validate the response of an insecurely/untrusted fetched
283        # origin, and we don't want someone to be able to poison the cache and
284        # require manual eviction from the cache to fix it.
285        if cache:
286            secure_adapter = CacheControlAdapter(
287                cache=SafeFileCache(cache),
288                max_retries=retries,
289            )
290            self._trusted_host_adapter = InsecureCacheControlAdapter(
291                cache=SafeFileCache(cache),
292                max_retries=retries,
293            )
294        else:
295            secure_adapter = HTTPAdapter(max_retries=retries)
296            self._trusted_host_adapter = insecure_adapter
297
298        self.mount("https://", secure_adapter)
299        self.mount("http://", insecure_adapter)
300
301        # Enable file:// urls
302        self.mount("file://", LocalFSAdapter())
303
304        for host in trusted_hosts:
305            self.add_trusted_host(host, suppress_logging=True)
306
307    def update_index_urls(self, new_index_urls):
308        # type: (List[str]) -> None
309        """
310        :param new_index_urls: New index urls to update the authentication
311            handler with.
312        """
313        self.auth.index_urls = new_index_urls
314
315    def add_trusted_host(self, host, source=None, suppress_logging=False):
316        # type: (str, Optional[str], bool) -> None
317        """
318        :param host: It is okay to provide a host that has previously been
319            added.
320        :param source: An optional source string, for logging where the host
321            string came from.
322        """
323        if not suppress_logging:
324            msg = 'adding trusted host: {!r}'.format(host)
325            if source is not None:
326                msg += ' (from {})'.format(source)
327            logger.info(msg)
328
329        host_port = parse_netloc(host)
330        if host_port not in self.pip_trusted_origins:
331            self.pip_trusted_origins.append(host_port)
332
333        self.mount(
334            build_url_from_netloc(host) + '/',
335            self._trusted_host_adapter
336        )
337        if not host_port[1]:
338            # Mount wildcard ports for the same host.
339            self.mount(
340                build_url_from_netloc(host) + ':',
341                self._trusted_host_adapter
342            )
343
344    def iter_secure_origins(self):
345        # type: () -> Iterator[SecureOrigin]
346        for secure_origin in SECURE_ORIGINS:
347            yield secure_origin
348        for host, port in self.pip_trusted_origins:
349            yield ('*', host, '*' if port is None else port)
350
351    def is_secure_origin(self, location):
352        # type: (Link) -> bool
353        # Determine if this url used a secure transport mechanism
354        parsed = urllib_parse.urlparse(str(location))
355        origin_protocol, origin_host, origin_port = (
356            parsed.scheme, parsed.hostname, parsed.port,
357        )
358
359        # The protocol to use to see if the protocol matches.
360        # Don't count the repository type as part of the protocol: in
361        # cases such as "git+ssh", only use "ssh". (I.e., Only verify against
362        # the last scheme.)
363        origin_protocol = origin_protocol.rsplit('+', 1)[-1]
364
365        # Determine if our origin is a secure origin by looking through our
366        # hardcoded list of secure origins, as well as any additional ones
367        # configured on this PackageFinder instance.
368        for secure_origin in self.iter_secure_origins():
369            secure_protocol, secure_host, secure_port = secure_origin
370            if origin_protocol != secure_protocol and secure_protocol != "*":
371                continue
372
373            try:
374                addr = ipaddress.ip_address(
375                    None
376                    if origin_host is None
377                    else six.ensure_text(origin_host)
378                )
379                network = ipaddress.ip_network(
380                    six.ensure_text(secure_host)
381                )
382            except ValueError:
383                # We don't have both a valid address or a valid network, so
384                # we'll check this origin against hostnames.
385                if (
386                    origin_host and
387                    origin_host.lower() != secure_host.lower() and
388                    secure_host != "*"
389                ):
390                    continue
391            else:
392                # We have a valid address and network, so see if the address
393                # is contained within the network.
394                if addr not in network:
395                    continue
396
397            # Check to see if the port matches.
398            if (
399                origin_port != secure_port and
400                secure_port != "*" and
401                secure_port is not None
402            ):
403                continue
404
405            # If we've gotten here, then this origin matches the current
406            # secure origin and we should return True
407            return True
408
409        # If we've gotten to this point, then the origin isn't secure and we
410        # will not accept it as a valid location to search. We will however
411        # log a warning that we are ignoring it.
412        logger.warning(
413            "The repository located at %s is not a trusted or secure host and "
414            "is being ignored. If this repository is available via HTTPS we "
415            "recommend you use HTTPS instead, otherwise you may silence "
416            "this warning and allow it anyway with '--trusted-host %s'.",
417            origin_host,
418            origin_host,
419        )
420
421        return False
422
423    def request(self, method, url, *args, **kwargs):
424        # Allow setting a default timeout on a session
425        kwargs.setdefault("timeout", self.timeout)
426
427        # Dispatch the actual request
428        return super(PipSession, self).request(method, url, *args, **kwargs)
429