1"""Support for installing and building the "wheel" binary package format. 2""" 3 4# The following comment should be removed at some point in the future. 5# mypy: strict-optional=False 6 7from __future__ import absolute_import 8 9import collections 10import compileall 11import csv 12import logging 13import os.path 14import re 15import shutil 16import stat 17import sys 18import warnings 19from base64 import urlsafe_b64encode 20from zipfile import ZipFile 21 22from pipenv.patched.notpip._vendor import pkg_resources 23from pipenv.patched.notpip._vendor.distlib.scripts import ScriptMaker 24from pipenv.patched.notpip._vendor.distlib.util import get_export_entry 25from pipenv.patched.notpip._vendor.six import StringIO 26 27from pipenv.patched.notpip._internal.exceptions import InstallationError 28from pipenv.patched.notpip._internal.locations import get_major_minor_version 29from pipenv.patched.notpip._internal.utils.misc import captured_stdout, ensure_dir, hash_file 30from pipenv.patched.notpip._internal.utils.temp_dir import TempDirectory 31from pipenv.patched.notpip._internal.utils.typing import MYPY_CHECK_RUNNING 32from pipenv.patched.notpip._internal.utils.unpacking import unpack_file 33from pipenv.patched.notpip._internal.utils.wheel import parse_wheel 34 35if MYPY_CHECK_RUNNING: 36 from email.message import Message 37 from typing import ( 38 Dict, List, Optional, Sequence, Tuple, IO, Text, Any, 39 Iterable, Callable, Set, 40 ) 41 42 from pipenv.patched.notpip._internal.models.scheme import Scheme 43 44 InstalledCSVRow = Tuple[str, ...] 45 46 47logger = logging.getLogger(__name__) 48 49 50def normpath(src, p): 51 # type: (str, str) -> str 52 return os.path.relpath(src, p).replace(os.path.sep, '/') 53 54 55def rehash(path, blocksize=1 << 20): 56 # type: (str, int) -> Tuple[str, str] 57 """Return (encoded_digest, length) for path using hashlib.sha256()""" 58 h, length = hash_file(path, blocksize) 59 digest = 'sha256=' + urlsafe_b64encode( 60 h.digest() 61 ).decode('latin1').rstrip('=') 62 # unicode/str python2 issues 63 return (digest, str(length)) # type: ignore 64 65 66def open_for_csv(name, mode): 67 # type: (str, Text) -> IO[Any] 68 if sys.version_info[0] < 3: 69 nl = {} # type: Dict[str, Any] 70 bin = 'b' 71 else: 72 nl = {'newline': ''} # type: Dict[str, Any] 73 bin = '' 74 return open(name, mode + bin, **nl) 75 76 77def fix_script(path): 78 # type: (str) -> Optional[bool] 79 """Replace #!python with #!/path/to/python 80 Return True if file was changed. 81 """ 82 # XXX RECORD hashes will need to be updated 83 if os.path.isfile(path): 84 with open(path, 'rb') as script: 85 firstline = script.readline() 86 if not firstline.startswith(b'#!python'): 87 return False 88 exename = sys.executable.encode(sys.getfilesystemencoding()) 89 firstline = b'#!' + exename + os.linesep.encode("ascii") 90 rest = script.read() 91 with open(path, 'wb') as script: 92 script.write(firstline) 93 script.write(rest) 94 return True 95 return None 96 97 98def wheel_root_is_purelib(metadata): 99 # type: (Message) -> bool 100 return metadata.get("Root-Is-Purelib", "").lower() == "true" 101 102 103def get_entrypoints(filename): 104 # type: (str) -> Tuple[Dict[str, str], Dict[str, str]] 105 if not os.path.exists(filename): 106 return {}, {} 107 108 # This is done because you can pass a string to entry_points wrappers which 109 # means that they may or may not be valid INI files. The attempt here is to 110 # strip leading and trailing whitespace in order to make them valid INI 111 # files. 112 with open(filename) as fp: 113 data = StringIO() 114 for line in fp: 115 data.write(line.strip()) 116 data.write("\n") 117 data.seek(0) 118 119 # get the entry points and then the script names 120 entry_points = pkg_resources.EntryPoint.parse_map(data) 121 console = entry_points.get('console_scripts', {}) 122 gui = entry_points.get('gui_scripts', {}) 123 124 def _split_ep(s): 125 # type: (pkg_resources.EntryPoint) -> Tuple[str, str] 126 """get the string representation of EntryPoint, 127 remove space and split on '=' 128 """ 129 split_parts = str(s).replace(" ", "").split("=") 130 return split_parts[0], split_parts[1] 131 132 # convert the EntryPoint objects into strings with module:function 133 console = dict(_split_ep(v) for v in console.values()) 134 gui = dict(_split_ep(v) for v in gui.values()) 135 return console, gui 136 137 138def message_about_scripts_not_on_PATH(scripts): 139 # type: (Sequence[str]) -> Optional[str] 140 """Determine if any scripts are not on PATH and format a warning. 141 Returns a warning message if one or more scripts are not on PATH, 142 otherwise None. 143 """ 144 if not scripts: 145 return None 146 147 # Group scripts by the path they were installed in 148 grouped_by_dir = collections.defaultdict(set) # type: Dict[str, Set[str]] 149 for destfile in scripts: 150 parent_dir = os.path.dirname(destfile) 151 script_name = os.path.basename(destfile) 152 grouped_by_dir[parent_dir].add(script_name) 153 154 # We don't want to warn for directories that are on PATH. 155 not_warn_dirs = [ 156 os.path.normcase(i).rstrip(os.sep) for i in 157 os.environ.get("PATH", "").split(os.pathsep) 158 ] 159 # If an executable sits with sys.executable, we don't warn for it. 160 # This covers the case of venv invocations without activating the venv. 161 not_warn_dirs.append(os.path.normcase(os.path.dirname(sys.executable))) 162 warn_for = { 163 parent_dir: scripts for parent_dir, scripts in grouped_by_dir.items() 164 if os.path.normcase(parent_dir) not in not_warn_dirs 165 } # type: Dict[str, Set[str]] 166 if not warn_for: 167 return None 168 169 # Format a message 170 msg_lines = [] 171 for parent_dir, dir_scripts in warn_for.items(): 172 sorted_scripts = sorted(dir_scripts) # type: List[str] 173 if len(sorted_scripts) == 1: 174 start_text = "script {} is".format(sorted_scripts[0]) 175 else: 176 start_text = "scripts {} are".format( 177 ", ".join(sorted_scripts[:-1]) + " and " + sorted_scripts[-1] 178 ) 179 180 msg_lines.append( 181 "The {} installed in '{}' which is not on PATH." 182 .format(start_text, parent_dir) 183 ) 184 185 last_line_fmt = ( 186 "Consider adding {} to PATH or, if you prefer " 187 "to suppress this warning, use --no-warn-script-location." 188 ) 189 if len(msg_lines) == 1: 190 msg_lines.append(last_line_fmt.format("this directory")) 191 else: 192 msg_lines.append(last_line_fmt.format("these directories")) 193 194 # Add a note if any directory starts with ~ 195 warn_for_tilde = any( 196 i[0] == "~" for i in os.environ.get("PATH", "").split(os.pathsep) if i 197 ) 198 if warn_for_tilde: 199 tilde_warning_msg = ( 200 "NOTE: The current PATH contains path(s) starting with `~`, " 201 "which may not be expanded by all applications." 202 ) 203 msg_lines.append(tilde_warning_msg) 204 205 # Returns the formatted multiline message 206 return "\n".join(msg_lines) 207 208 209def sorted_outrows(outrows): 210 # type: (Iterable[InstalledCSVRow]) -> List[InstalledCSVRow] 211 """Return the given rows of a RECORD file in sorted order. 212 213 Each row is a 3-tuple (path, hash, size) and corresponds to a record of 214 a RECORD file (see PEP 376 and PEP 427 for details). For the rows 215 passed to this function, the size can be an integer as an int or string, 216 or the empty string. 217 """ 218 # Normally, there should only be one row per path, in which case the 219 # second and third elements don't come into play when sorting. 220 # However, in cases in the wild where a path might happen to occur twice, 221 # we don't want the sort operation to trigger an error (but still want 222 # determinism). Since the third element can be an int or string, we 223 # coerce each element to a string to avoid a TypeError in this case. 224 # For additional background, see-- 225 # https://github.com/pypa/pip/issues/5868 226 return sorted(outrows, key=lambda row: tuple(str(x) for x in row)) 227 228 229def get_csv_rows_for_installed( 230 old_csv_rows, # type: Iterable[List[str]] 231 installed, # type: Dict[str, str] 232 changed, # type: Set[str] 233 generated, # type: List[str] 234 lib_dir, # type: str 235): 236 # type: (...) -> List[InstalledCSVRow] 237 """ 238 :param installed: A map from archive RECORD path to installation RECORD 239 path. 240 """ 241 installed_rows = [] # type: List[InstalledCSVRow] 242 for row in old_csv_rows: 243 if len(row) > 3: 244 logger.warning( 245 'RECORD line has more than three elements: {}'.format(row) 246 ) 247 # Make a copy because we are mutating the row. 248 row = list(row) 249 old_path = row[0] 250 new_path = installed.pop(old_path, old_path) 251 row[0] = new_path 252 if new_path in changed: 253 digest, length = rehash(new_path) 254 row[1] = digest 255 row[2] = length 256 installed_rows.append(tuple(row)) 257 for f in generated: 258 digest, length = rehash(f) 259 installed_rows.append((normpath(f, lib_dir), digest, str(length))) 260 for f in installed: 261 installed_rows.append((installed[f], '', '')) 262 return installed_rows 263 264 265class MissingCallableSuffix(Exception): 266 pass 267 268 269def _raise_for_invalid_entrypoint(specification): 270 # type: (str) -> None 271 entry = get_export_entry(specification) 272 if entry is not None and entry.suffix is None: 273 raise MissingCallableSuffix(str(entry)) 274 275 276class PipScriptMaker(ScriptMaker): 277 def make(self, specification, options=None): 278 # type: (str, Dict[str, Any]) -> List[str] 279 _raise_for_invalid_entrypoint(specification) 280 return super(PipScriptMaker, self).make(specification, options) 281 282 283def install_unpacked_wheel( 284 name, # type: str 285 wheeldir, # type: str 286 wheel_zip, # type: ZipFile 287 scheme, # type: Scheme 288 req_description, # type: str 289 pycompile=True, # type: bool 290 warn_script_location=True # type: bool 291): 292 # type: (...) -> None 293 """Install a wheel. 294 295 :param name: Name of the project to install 296 :param wheeldir: Base directory of the unpacked wheel 297 :param wheel_zip: open ZipFile for wheel being installed 298 :param scheme: Distutils scheme dictating the install directories 299 :param req_description: String used in place of the requirement, for 300 logging 301 :param pycompile: Whether to byte-compile installed Python files 302 :param warn_script_location: Whether to check that scripts are installed 303 into a directory on PATH 304 :raises UnsupportedWheel: 305 * when the directory holds an unpacked wheel with incompatible 306 Wheel-Version 307 * when the .dist-info dir does not match the wheel 308 """ 309 # TODO: Investigate and break this up. 310 # TODO: Look into moving this into a dedicated class for representing an 311 # installation. 312 313 source = wheeldir.rstrip(os.path.sep) + os.path.sep 314 315 info_dir, metadata = parse_wheel(wheel_zip, name) 316 317 if wheel_root_is_purelib(metadata): 318 lib_dir = scheme.purelib 319 else: 320 lib_dir = scheme.platlib 321 322 subdirs = os.listdir(source) 323 data_dirs = [s for s in subdirs if s.endswith('.data')] 324 325 # Record details of the files moved 326 # installed = files copied from the wheel to the destination 327 # changed = files changed while installing (scripts #! line typically) 328 # generated = files newly generated during the install (script wrappers) 329 installed = {} # type: Dict[str, str] 330 changed = set() 331 generated = [] # type: List[str] 332 333 # Compile all of the pyc files that we're going to be installing 334 if pycompile: 335 with captured_stdout() as stdout: 336 with warnings.catch_warnings(): 337 warnings.filterwarnings('ignore') 338 compileall.compile_dir(source, force=True, quiet=True) 339 logger.debug(stdout.getvalue()) 340 341 def record_installed(srcfile, destfile, modified=False): 342 # type: (str, str, bool) -> None 343 """Map archive RECORD paths to installation RECORD paths.""" 344 oldpath = normpath(srcfile, wheeldir) 345 newpath = normpath(destfile, lib_dir) 346 installed[oldpath] = newpath 347 if modified: 348 changed.add(destfile) 349 350 def clobber( 351 source, # type: str 352 dest, # type: str 353 is_base, # type: bool 354 fixer=None, # type: Optional[Callable[[str], Any]] 355 filter=None # type: Optional[Callable[[str], bool]] 356 ): 357 # type: (...) -> None 358 ensure_dir(dest) # common for the 'include' path 359 360 for dir, subdirs, files in os.walk(source): 361 basedir = dir[len(source):].lstrip(os.path.sep) 362 destdir = os.path.join(dest, basedir) 363 if is_base and basedir == '': 364 subdirs[:] = [s for s in subdirs if not s.endswith('.data')] 365 for f in files: 366 # Skip unwanted files 367 if filter and filter(f): 368 continue 369 srcfile = os.path.join(dir, f) 370 destfile = os.path.join(dest, basedir, f) 371 # directory creation is lazy and after the file filtering above 372 # to ensure we don't install empty dirs; empty dirs can't be 373 # uninstalled. 374 ensure_dir(destdir) 375 376 # copyfile (called below) truncates the destination if it 377 # exists and then writes the new contents. This is fine in most 378 # cases, but can cause a segfault if pip has loaded a shared 379 # object (e.g. from pyopenssl through its vendored urllib3) 380 # Since the shared object is mmap'd an attempt to call a 381 # symbol in it will then cause a segfault. Unlinking the file 382 # allows writing of new contents while allowing the process to 383 # continue to use the old copy. 384 if os.path.exists(destfile): 385 os.unlink(destfile) 386 387 # We use copyfile (not move, copy, or copy2) to be extra sure 388 # that we are not moving directories over (copyfile fails for 389 # directories) as well as to ensure that we are not copying 390 # over any metadata because we want more control over what 391 # metadata we actually copy over. 392 shutil.copyfile(srcfile, destfile) 393 394 # Copy over the metadata for the file, currently this only 395 # includes the atime and mtime. 396 st = os.stat(srcfile) 397 if hasattr(os, "utime"): 398 os.utime(destfile, (st.st_atime, st.st_mtime)) 399 400 # If our file is executable, then make our destination file 401 # executable. 402 if os.access(srcfile, os.X_OK): 403 st = os.stat(srcfile) 404 permissions = ( 405 st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH 406 ) 407 os.chmod(destfile, permissions) 408 409 changed = False 410 if fixer: 411 changed = fixer(destfile) 412 record_installed(srcfile, destfile, changed) 413 414 clobber(source, lib_dir, True) 415 416 dest_info_dir = os.path.join(lib_dir, info_dir) 417 418 # Get the defined entry points 419 ep_file = os.path.join(dest_info_dir, 'entry_points.txt') 420 console, gui = get_entrypoints(ep_file) 421 422 def is_entrypoint_wrapper(name): 423 # type: (str) -> bool 424 # EP, EP.exe and EP-script.py are scripts generated for 425 # entry point EP by setuptools 426 if name.lower().endswith('.exe'): 427 matchname = name[:-4] 428 elif name.lower().endswith('-script.py'): 429 matchname = name[:-10] 430 elif name.lower().endswith(".pya"): 431 matchname = name[:-4] 432 else: 433 matchname = name 434 # Ignore setuptools-generated scripts 435 return (matchname in console or matchname in gui) 436 437 for datadir in data_dirs: 438 fixer = None 439 filter = None 440 for subdir in os.listdir(os.path.join(wheeldir, datadir)): 441 fixer = None 442 if subdir == 'scripts': 443 fixer = fix_script 444 filter = is_entrypoint_wrapper 445 source = os.path.join(wheeldir, datadir, subdir) 446 dest = getattr(scheme, subdir) 447 clobber(source, dest, False, fixer=fixer, filter=filter) 448 449 maker = PipScriptMaker(None, scheme.scripts) 450 451 # Ensure old scripts are overwritten. 452 # See https://github.com/pypa/pip/issues/1800 453 maker.clobber = True 454 455 # Ensure we don't generate any variants for scripts because this is almost 456 # never what somebody wants. 457 # See https://bitbucket.org/pypa/distlib/issue/35/ 458 maker.variants = {''} 459 460 # This is required because otherwise distlib creates scripts that are not 461 # executable. 462 # See https://bitbucket.org/pypa/distlib/issue/32/ 463 maker.set_mode = True 464 465 scripts_to_generate = [] 466 467 # Special case pip and setuptools to generate versioned wrappers 468 # 469 # The issue is that some projects (specifically, pip and setuptools) use 470 # code in setup.py to create "versioned" entry points - pip2.7 on Python 471 # 2.7, pip3.3 on Python 3.3, etc. But these entry points are baked into 472 # the wheel metadata at build time, and so if the wheel is installed with 473 # a *different* version of Python the entry points will be wrong. The 474 # correct fix for this is to enhance the metadata to be able to describe 475 # such versioned entry points, but that won't happen till Metadata 2.0 is 476 # available. 477 # In the meantime, projects using versioned entry points will either have 478 # incorrect versioned entry points, or they will not be able to distribute 479 # "universal" wheels (i.e., they will need a wheel per Python version). 480 # 481 # Because setuptools and pip are bundled with _ensurepip and virtualenv, 482 # we need to use universal wheels. So, as a stopgap until Metadata 2.0, we 483 # override the versioned entry points in the wheel and generate the 484 # correct ones. This code is purely a short-term measure until Metadata 2.0 485 # is available. 486 # 487 # To add the level of hack in this section of code, in order to support 488 # ensurepip this code will look for an ``ENSUREPIP_OPTIONS`` environment 489 # variable which will control which version scripts get installed. 490 # 491 # ENSUREPIP_OPTIONS=altinstall 492 # - Only pipX.Y and easy_install-X.Y will be generated and installed 493 # ENSUREPIP_OPTIONS=install 494 # - pipX.Y, pipX, easy_install-X.Y will be generated and installed. Note 495 # that this option is technically if ENSUREPIP_OPTIONS is set and is 496 # not altinstall 497 # DEFAULT 498 # - The default behavior is to install pip, pipX, pipX.Y, easy_install 499 # and easy_install-X.Y. 500 pip_script = console.pop('pip', None) 501 if pip_script: 502 if "ENSUREPIP_OPTIONS" not in os.environ: 503 scripts_to_generate.append('pip = ' + pip_script) 504 505 if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall": 506 scripts_to_generate.append( 507 'pip%s = %s' % (sys.version_info[0], pip_script) 508 ) 509 510 scripts_to_generate.append( 511 'pip%s = %s' % (get_major_minor_version(), pip_script) 512 ) 513 # Delete any other versioned pip entry points 514 pip_ep = [k for k in console if re.match(r'pip(\d(\.\d)?)?$', k)] 515 for k in pip_ep: 516 del console[k] 517 easy_install_script = console.pop('easy_install', None) 518 if easy_install_script: 519 if "ENSUREPIP_OPTIONS" not in os.environ: 520 scripts_to_generate.append( 521 'easy_install = ' + easy_install_script 522 ) 523 524 scripts_to_generate.append( 525 'easy_install-%s = %s' % ( 526 get_major_minor_version(), easy_install_script 527 ) 528 ) 529 # Delete any other versioned easy_install entry points 530 easy_install_ep = [ 531 k for k in console if re.match(r'easy_install(-\d\.\d)?$', k) 532 ] 533 for k in easy_install_ep: 534 del console[k] 535 536 # Generate the console and GUI entry points specified in the wheel 537 scripts_to_generate.extend( 538 '%s = %s' % kv for kv in console.items() 539 ) 540 541 gui_scripts_to_generate = [ 542 '%s = %s' % kv for kv in gui.items() 543 ] 544 545 generated_console_scripts = [] # type: List[str] 546 547 try: 548 generated_console_scripts = maker.make_multiple(scripts_to_generate) 549 generated.extend(generated_console_scripts) 550 551 generated.extend( 552 maker.make_multiple(gui_scripts_to_generate, {'gui': True}) 553 ) 554 except MissingCallableSuffix as e: 555 entry = e.args[0] 556 raise InstallationError( 557 "Invalid script entry point: {} for req: {} - A callable " 558 "suffix is required. Cf https://packaging.python.org/" 559 "specifications/entry-points/#use-for-scripts for more " 560 "information.".format(entry, req_description) 561 ) 562 563 if warn_script_location: 564 msg = message_about_scripts_not_on_PATH(generated_console_scripts) 565 if msg is not None: 566 logger.warning(msg) 567 568 # Record pip as the installer 569 installer = os.path.join(dest_info_dir, 'INSTALLER') 570 temp_installer = os.path.join(dest_info_dir, 'INSTALLER.pip') 571 with open(temp_installer, 'wb') as installer_file: 572 installer_file.write(b'pip\n') 573 shutil.move(temp_installer, installer) 574 generated.append(installer) 575 576 # Record details of all files installed 577 record = os.path.join(dest_info_dir, 'RECORD') 578 temp_record = os.path.join(dest_info_dir, 'RECORD.pip') 579 with open_for_csv(record, 'r') as record_in: 580 with open_for_csv(temp_record, 'w+') as record_out: 581 reader = csv.reader(record_in) 582 outrows = get_csv_rows_for_installed( 583 reader, installed=installed, changed=changed, 584 generated=generated, lib_dir=lib_dir, 585 ) 586 writer = csv.writer(record_out) 587 # Sort to simplify testing. 588 for row in sorted_outrows(outrows): 589 writer.writerow(row) 590 shutil.move(temp_record, record) 591 592 593def install_wheel( 594 name, # type: str 595 wheel_path, # type: str 596 scheme, # type: Scheme 597 req_description, # type: str 598 pycompile=True, # type: bool 599 warn_script_location=True, # type: bool 600 _temp_dir_for_testing=None, # type: Optional[str] 601): 602 # type: (...) -> None 603 with TempDirectory( 604 path=_temp_dir_for_testing, kind="unpacked-wheel" 605 ) as unpacked_dir, ZipFile(wheel_path, allowZip64=True) as z: 606 unpack_file(wheel_path, unpacked_dir.path) 607 install_unpacked_wheel( 608 name=name, 609 wheeldir=unpacked_dir.path, 610 wheel_zip=z, 611 scheme=scheme, 612 req_description=req_description, 613 pycompile=pycompile, 614 warn_script_location=warn_script_location, 615 ) 616