1"""PipSession and supporting code, containing all pip-specific 2network request configuration and behavior. 3""" 4 5# The following comment should be removed at some point in the future. 6# mypy: disallow-untyped-defs=False 7 8import email.utils 9import json 10import logging 11import mimetypes 12import os 13import platform 14import sys 15import warnings 16 17from pip._vendor import requests, six, urllib3 18from pip._vendor.cachecontrol import CacheControlAdapter 19from pip._vendor.requests.adapters import BaseAdapter, HTTPAdapter 20from pip._vendor.requests.models import Response 21from pip._vendor.requests.structures import CaseInsensitiveDict 22from pip._vendor.six.moves.urllib import parse as urllib_parse 23from pip._vendor.urllib3.exceptions import InsecureRequestWarning 24 25from pip import __version__ 26from pip._internal.network.auth import MultiDomainBasicAuth 27from pip._internal.network.cache import SafeFileCache 28 29# Import ssl from compat so the initial import occurs in only one place. 30from pip._internal.utils.compat import has_tls, ipaddress 31from pip._internal.utils.glibc import libc_ver 32from pip._internal.utils.misc import ( 33 build_url_from_netloc, 34 get_installed_version, 35 parse_netloc, 36) 37from pip._internal.utils.typing import MYPY_CHECK_RUNNING 38from pip._internal.utils.urls import url_to_path 39 40if MYPY_CHECK_RUNNING: 41 from typing import Iterator, List, Optional, Tuple, Union 42 43 from pip._internal.models.link import Link 44 45 SecureOrigin = Tuple[str, str, Optional[Union[int, str]]] 46 47 48logger = logging.getLogger(__name__) 49 50 51# Ignore warning raised when using --trusted-host. 52warnings.filterwarnings("ignore", category=InsecureRequestWarning) 53 54 55SECURE_ORIGINS = [ 56 # protocol, hostname, port 57 # Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC) 58 ("https", "*", "*"), 59 ("*", "localhost", "*"), 60 ("*", "127.0.0.0/8", "*"), 61 ("*", "::1/128", "*"), 62 ("file", "*", None), 63 # ssh is always secure. 64 ("ssh", "*", "*"), 65] # type: List[SecureOrigin] 66 67 68# These are environment variables present when running under various 69# CI systems. For each variable, some CI systems that use the variable 70# are indicated. The collection was chosen so that for each of a number 71# of popular systems, at least one of the environment variables is used. 72# This list is used to provide some indication of and lower bound for 73# CI traffic to PyPI. Thus, it is okay if the list is not comprehensive. 74# For more background, see: https://github.com/pypa/pip/issues/5499 75CI_ENVIRONMENT_VARIABLES = ( 76 # Azure Pipelines 77 'BUILD_BUILDID', 78 # Jenkins 79 'BUILD_ID', 80 # AppVeyor, CircleCI, Codeship, Gitlab CI, Shippable, Travis CI 81 'CI', 82 # Explicit environment variable. 83 'PIP_IS_CI', 84) 85 86 87def looks_like_ci(): 88 # type: () -> bool 89 """ 90 Return whether it looks like pip is running under CI. 91 """ 92 # We don't use the method of checking for a tty (e.g. using isatty()) 93 # because some CI systems mimic a tty (e.g. Travis CI). Thus that 94 # method doesn't provide definitive information in either direction. 95 return any(name in os.environ for name in CI_ENVIRONMENT_VARIABLES) 96 97 98def user_agent(): 99 """ 100 Return a string representing the user agent. 101 """ 102 data = { 103 "installer": {"name": "pip", "version": __version__}, 104 "python": platform.python_version(), 105 "implementation": { 106 "name": platform.python_implementation(), 107 }, 108 } 109 110 if data["implementation"]["name"] == 'CPython': 111 data["implementation"]["version"] = platform.python_version() 112 elif data["implementation"]["name"] == 'PyPy': 113 if sys.pypy_version_info.releaselevel == 'final': 114 pypy_version_info = sys.pypy_version_info[:3] 115 else: 116 pypy_version_info = sys.pypy_version_info 117 data["implementation"]["version"] = ".".join( 118 [str(x) for x in pypy_version_info] 119 ) 120 elif data["implementation"]["name"] == 'Jython': 121 # Complete Guess 122 data["implementation"]["version"] = platform.python_version() 123 elif data["implementation"]["name"] == 'IronPython': 124 # Complete Guess 125 data["implementation"]["version"] = platform.python_version() 126 127 if sys.platform.startswith("linux"): 128 from pip._vendor import distro 129 distro_infos = dict(filter( 130 lambda x: x[1], 131 zip(["name", "version", "id"], distro.linux_distribution()), 132 )) 133 libc = dict(filter( 134 lambda x: x[1], 135 zip(["lib", "version"], libc_ver()), 136 )) 137 if libc: 138 distro_infos["libc"] = libc 139 if distro_infos: 140 data["distro"] = distro_infos 141 142 if sys.platform.startswith("darwin") and platform.mac_ver()[0]: 143 data["distro"] = {"name": "macOS", "version": platform.mac_ver()[0]} 144 145 if platform.system(): 146 data.setdefault("system", {})["name"] = platform.system() 147 148 if platform.release(): 149 data.setdefault("system", {})["release"] = platform.release() 150 151 if platform.machine(): 152 data["cpu"] = platform.machine() 153 154 if has_tls(): 155 import _ssl as ssl 156 data["openssl_version"] = ssl.OPENSSL_VERSION 157 158 setuptools_version = get_installed_version("setuptools") 159 if setuptools_version is not None: 160 data["setuptools_version"] = setuptools_version 161 162 # Use None rather than False so as not to give the impression that 163 # pip knows it is not being run under CI. Rather, it is a null or 164 # inconclusive result. Also, we include some value rather than no 165 # value to make it easier to know that the check has been run. 166 data["ci"] = True if looks_like_ci() else None 167 168 user_data = os.environ.get("PIP_USER_AGENT_USER_DATA") 169 if user_data is not None: 170 data["user_data"] = user_data 171 172 return "{data[installer][name]}/{data[installer][version]} {json}".format( 173 data=data, 174 json=json.dumps(data, separators=(",", ":"), sort_keys=True), 175 ) 176 177 178class LocalFSAdapter(BaseAdapter): 179 180 def send(self, request, stream=None, timeout=None, verify=None, cert=None, 181 proxies=None): 182 pathname = url_to_path(request.url) 183 184 resp = Response() 185 resp.status_code = 200 186 resp.url = request.url 187 188 try: 189 stats = os.stat(pathname) 190 except OSError as exc: 191 resp.status_code = 404 192 resp.raw = exc 193 else: 194 modified = email.utils.formatdate(stats.st_mtime, usegmt=True) 195 content_type = mimetypes.guess_type(pathname)[0] or "text/plain" 196 resp.headers = CaseInsensitiveDict({ 197 "Content-Type": content_type, 198 "Content-Length": stats.st_size, 199 "Last-Modified": modified, 200 }) 201 202 resp.raw = open(pathname, "rb") 203 resp.close = resp.raw.close 204 205 return resp 206 207 def close(self): 208 pass 209 210 211class InsecureHTTPAdapter(HTTPAdapter): 212 213 def cert_verify(self, conn, url, verify, cert): 214 super(InsecureHTTPAdapter, self).cert_verify( 215 conn=conn, url=url, verify=False, cert=cert 216 ) 217 218 219class InsecureCacheControlAdapter(CacheControlAdapter): 220 221 def cert_verify(self, conn, url, verify, cert): 222 super(InsecureCacheControlAdapter, self).cert_verify( 223 conn=conn, url=url, verify=False, cert=cert 224 ) 225 226 227class PipSession(requests.Session): 228 229 timeout = None # type: Optional[int] 230 231 def __init__(self, *args, **kwargs): 232 """ 233 :param trusted_hosts: Domains not to emit warnings for when not using 234 HTTPS. 235 """ 236 retries = kwargs.pop("retries", 0) 237 cache = kwargs.pop("cache", None) 238 trusted_hosts = kwargs.pop("trusted_hosts", []) # type: List[str] 239 index_urls = kwargs.pop("index_urls", None) 240 241 super(PipSession, self).__init__(*args, **kwargs) 242 243 # Namespace the attribute with "pip_" just in case to prevent 244 # possible conflicts with the base class. 245 self.pip_trusted_origins = [] # type: List[Tuple[str, Optional[int]]] 246 247 # Attach our User Agent to the request 248 self.headers["User-Agent"] = user_agent() 249 250 # Attach our Authentication handler to the session 251 self.auth = MultiDomainBasicAuth(index_urls=index_urls) 252 253 # Create our urllib3.Retry instance which will allow us to customize 254 # how we handle retries. 255 retries = urllib3.Retry( 256 # Set the total number of retries that a particular request can 257 # have. 258 total=retries, 259 260 # A 503 error from PyPI typically means that the Fastly -> Origin 261 # connection got interrupted in some way. A 503 error in general 262 # is typically considered a transient error so we'll go ahead and 263 # retry it. 264 # A 500 may indicate transient error in Amazon S3 265 # A 520 or 527 - may indicate transient error in CloudFlare 266 status_forcelist=[500, 503, 520, 527], 267 268 # Add a small amount of back off between failed requests in 269 # order to prevent hammering the service. 270 backoff_factor=0.25, 271 ) 272 273 # Our Insecure HTTPAdapter disables HTTPS validation. It does not 274 # support caching so we'll use it for all http:// URLs. 275 # If caching is disabled, we will also use it for 276 # https:// hosts that we've marked as ignoring 277 # TLS errors for (trusted-hosts). 278 insecure_adapter = InsecureHTTPAdapter(max_retries=retries) 279 280 # We want to _only_ cache responses on securely fetched origins or when 281 # the host is specified as trusted. We do this because 282 # we can't validate the response of an insecurely/untrusted fetched 283 # origin, and we don't want someone to be able to poison the cache and 284 # require manual eviction from the cache to fix it. 285 if cache: 286 secure_adapter = CacheControlAdapter( 287 cache=SafeFileCache(cache), 288 max_retries=retries, 289 ) 290 self._trusted_host_adapter = InsecureCacheControlAdapter( 291 cache=SafeFileCache(cache), 292 max_retries=retries, 293 ) 294 else: 295 secure_adapter = HTTPAdapter(max_retries=retries) 296 self._trusted_host_adapter = insecure_adapter 297 298 self.mount("https://", secure_adapter) 299 self.mount("http://", insecure_adapter) 300 301 # Enable file:// urls 302 self.mount("file://", LocalFSAdapter()) 303 304 for host in trusted_hosts: 305 self.add_trusted_host(host, suppress_logging=True) 306 307 def update_index_urls(self, new_index_urls): 308 # type: (List[str]) -> None 309 """ 310 :param new_index_urls: New index urls to update the authentication 311 handler with. 312 """ 313 self.auth.index_urls = new_index_urls 314 315 def add_trusted_host(self, host, source=None, suppress_logging=False): 316 # type: (str, Optional[str], bool) -> None 317 """ 318 :param host: It is okay to provide a host that has previously been 319 added. 320 :param source: An optional source string, for logging where the host 321 string came from. 322 """ 323 if not suppress_logging: 324 msg = 'adding trusted host: {!r}'.format(host) 325 if source is not None: 326 msg += ' (from {})'.format(source) 327 logger.info(msg) 328 329 host_port = parse_netloc(host) 330 if host_port not in self.pip_trusted_origins: 331 self.pip_trusted_origins.append(host_port) 332 333 self.mount( 334 build_url_from_netloc(host) + '/', 335 self._trusted_host_adapter 336 ) 337 if not host_port[1]: 338 # Mount wildcard ports for the same host. 339 self.mount( 340 build_url_from_netloc(host) + ':', 341 self._trusted_host_adapter 342 ) 343 344 def iter_secure_origins(self): 345 # type: () -> Iterator[SecureOrigin] 346 for secure_origin in SECURE_ORIGINS: 347 yield secure_origin 348 for host, port in self.pip_trusted_origins: 349 yield ('*', host, '*' if port is None else port) 350 351 def is_secure_origin(self, location): 352 # type: (Link) -> bool 353 # Determine if this url used a secure transport mechanism 354 parsed = urllib_parse.urlparse(str(location)) 355 origin_protocol, origin_host, origin_port = ( 356 parsed.scheme, parsed.hostname, parsed.port, 357 ) 358 359 # The protocol to use to see if the protocol matches. 360 # Don't count the repository type as part of the protocol: in 361 # cases such as "git+ssh", only use "ssh". (I.e., Only verify against 362 # the last scheme.) 363 origin_protocol = origin_protocol.rsplit('+', 1)[-1] 364 365 # Determine if our origin is a secure origin by looking through our 366 # hardcoded list of secure origins, as well as any additional ones 367 # configured on this PackageFinder instance. 368 for secure_origin in self.iter_secure_origins(): 369 secure_protocol, secure_host, secure_port = secure_origin 370 if origin_protocol != secure_protocol and secure_protocol != "*": 371 continue 372 373 try: 374 addr = ipaddress.ip_address( 375 None 376 if origin_host is None 377 else six.ensure_text(origin_host) 378 ) 379 network = ipaddress.ip_network( 380 six.ensure_text(secure_host) 381 ) 382 except ValueError: 383 # We don't have both a valid address or a valid network, so 384 # we'll check this origin against hostnames. 385 if ( 386 origin_host and 387 origin_host.lower() != secure_host.lower() and 388 secure_host != "*" 389 ): 390 continue 391 else: 392 # We have a valid address and network, so see if the address 393 # is contained within the network. 394 if addr not in network: 395 continue 396 397 # Check to see if the port matches. 398 if ( 399 origin_port != secure_port and 400 secure_port != "*" and 401 secure_port is not None 402 ): 403 continue 404 405 # If we've gotten here, then this origin matches the current 406 # secure origin and we should return True 407 return True 408 409 # If we've gotten to this point, then the origin isn't secure and we 410 # will not accept it as a valid location to search. We will however 411 # log a warning that we are ignoring it. 412 logger.warning( 413 "The repository located at %s is not a trusted or secure host and " 414 "is being ignored. If this repository is available via HTTPS we " 415 "recommend you use HTTPS instead, otherwise you may silence " 416 "this warning and allow it anyway with '--trusted-host %s'.", 417 origin_host, 418 origin_host, 419 ) 420 421 return False 422 423 def request(self, method, url, *args, **kwargs): 424 # Allow setting a default timeout on a session 425 kwargs.setdefault("timeout", self.timeout) 426 427 # Dispatch the actual request 428 return super(PipSession, self).request(method, url, *args, **kwargs) 429