1"""Support for installing and building the "wheel" binary package format.
2"""
3
4from __future__ import absolute_import
5
6import collections
7import compileall
8import contextlib
9import csv
10import importlib
11import logging
12import os.path
13import re
14import shutil
15import sys
16import warnings
17from base64 import urlsafe_b64encode
18from itertools import chain, starmap
19from zipfile import ZipFile
20
21from pip._vendor import pkg_resources
22from pip._vendor.distlib.scripts import ScriptMaker
23from pip._vendor.distlib.util import get_export_entry
24from pip._vendor.six import (
25    PY2,
26    ensure_str,
27    ensure_text,
28    itervalues,
29    reraise,
30    text_type,
31)
32from pip._vendor.six.moves import filterfalse, map
33
34from pip._internal.exceptions import InstallationError
35from pip._internal.locations import get_major_minor_version
36from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, DirectUrl
37from pip._internal.models.scheme import SCHEME_KEYS
38from pip._internal.utils.filesystem import adjacent_tmp_file, replace
39from pip._internal.utils.misc import (
40    captured_stdout,
41    ensure_dir,
42    hash_file,
43    partition,
44)
45from pip._internal.utils.typing import MYPY_CHECK_RUNNING
46from pip._internal.utils.unpacking import (
47    current_umask,
48    is_within_directory,
49    set_extracted_file_to_default_mode_plus_executable,
50    zip_item_is_executable,
51)
52from pip._internal.utils.wheel import (
53    parse_wheel,
54    pkg_resources_distribution_for_wheel,
55)
56
57# Use the custom cast function at runtime to make cast work,
58# and import typing.cast when performing pre-commit and type
59# checks
60if not MYPY_CHECK_RUNNING:
61    from pip._internal.utils.typing import cast
62else:
63    from email.message import Message
64    from typing import (
65        Any,
66        Callable,
67        Dict,
68        IO,
69        Iterable,
70        Iterator,
71        List,
72        NewType,
73        Optional,
74        Protocol,
75        Sequence,
76        Set,
77        Tuple,
78        Union,
79        cast,
80    )
81    from zipfile import ZipInfo
82
83    from pip._vendor.pkg_resources import Distribution
84
85    from pip._internal.models.scheme import Scheme
86    from pip._internal.utils.filesystem import NamedTemporaryFileResult
87
88    RecordPath = NewType('RecordPath', text_type)
89    InstalledCSVRow = Tuple[RecordPath, str, Union[int, str]]
90
91    class File(Protocol):
92        src_record_path = None  # type: RecordPath
93        dest_path = None  # type: text_type
94        changed = None  # type: bool
95
96        def save(self):
97            # type: () -> None
98            pass
99
100
101logger = logging.getLogger(__name__)
102
103
104def rehash(path, blocksize=1 << 20):
105    # type: (text_type, int) -> Tuple[str, str]
106    """Return (encoded_digest, length) for path using hashlib.sha256()"""
107    h, length = hash_file(path, blocksize)
108    digest = 'sha256=' + urlsafe_b64encode(
109        h.digest()
110    ).decode('latin1').rstrip('=')
111    # unicode/str python2 issues
112    return (digest, str(length))  # type: ignore
113
114
115def csv_io_kwargs(mode):
116    # type: (str) -> Dict[str, Any]
117    """Return keyword arguments to properly open a CSV file
118    in the given mode.
119    """
120    if PY2:
121        return {'mode': '{}b'.format(mode)}
122    else:
123        return {'mode': mode, 'newline': '', 'encoding': 'utf-8'}
124
125
126def fix_script(path):
127    # type: (text_type) -> bool
128    """Replace #!python with #!/path/to/python
129    Return True if file was changed.
130    """
131    # XXX RECORD hashes will need to be updated
132    assert os.path.isfile(path)
133
134    with open(path, 'rb') as script:
135        firstline = script.readline()
136        if not firstline.startswith(b'#!python'):
137            return False
138        exename = sys.executable.encode(sys.getfilesystemencoding())
139        firstline = b'#!' + exename + os.linesep.encode("ascii")
140        rest = script.read()
141    with open(path, 'wb') as script:
142        script.write(firstline)
143        script.write(rest)
144    return True
145
146
147def wheel_root_is_purelib(metadata):
148    # type: (Message) -> bool
149    return metadata.get("Root-Is-Purelib", "").lower() == "true"
150
151
152def get_entrypoints(distribution):
153    # type: (Distribution) -> Tuple[Dict[str, str], Dict[str, str]]
154    # get the entry points and then the script names
155    try:
156        console = distribution.get_entry_map('console_scripts')
157        gui = distribution.get_entry_map('gui_scripts')
158    except KeyError:
159        # Our dict-based Distribution raises KeyError if entry_points.txt
160        # doesn't exist.
161        return {}, {}
162
163    def _split_ep(s):
164        # type: (pkg_resources.EntryPoint) -> Tuple[str, str]
165        """get the string representation of EntryPoint,
166        remove space and split on '='
167        """
168        split_parts = str(s).replace(" ", "").split("=")
169        return split_parts[0], split_parts[1]
170
171    # convert the EntryPoint objects into strings with module:function
172    console = dict(_split_ep(v) for v in console.values())
173    gui = dict(_split_ep(v) for v in gui.values())
174    return console, gui
175
176
177def message_about_scripts_not_on_PATH(scripts):
178    # type: (Sequence[str]) -> Optional[str]
179    """Determine if any scripts are not on PATH and format a warning.
180    Returns a warning message if one or more scripts are not on PATH,
181    otherwise None.
182    """
183    if not scripts:
184        return None
185
186    # Group scripts by the path they were installed in
187    grouped_by_dir = collections.defaultdict(set)  # type: Dict[str, Set[str]]
188    for destfile in scripts:
189        parent_dir = os.path.dirname(destfile)
190        script_name = os.path.basename(destfile)
191        grouped_by_dir[parent_dir].add(script_name)
192
193    # We don't want to warn for directories that are on PATH.
194    not_warn_dirs = [
195        os.path.normcase(i).rstrip(os.sep) for i in
196        os.environ.get("PATH", "").split(os.pathsep)
197    ]
198    # If an executable sits with sys.executable, we don't warn for it.
199    #     This covers the case of venv invocations without activating the venv.
200    not_warn_dirs.append(os.path.normcase(os.path.dirname(sys.executable)))
201    warn_for = {
202        parent_dir: scripts for parent_dir, scripts in grouped_by_dir.items()
203        if os.path.normcase(parent_dir) not in not_warn_dirs
204    }  # type: Dict[str, Set[str]]
205    if not warn_for:
206        return None
207
208    # Format a message
209    msg_lines = []
210    for parent_dir, dir_scripts in warn_for.items():
211        sorted_scripts = sorted(dir_scripts)  # type: List[str]
212        if len(sorted_scripts) == 1:
213            start_text = "script {} is".format(sorted_scripts[0])
214        else:
215            start_text = "scripts {} are".format(
216                ", ".join(sorted_scripts[:-1]) + " and " + sorted_scripts[-1]
217            )
218
219        msg_lines.append(
220            "The {} installed in '{}' which is not on PATH."
221            .format(start_text, parent_dir)
222        )
223
224    last_line_fmt = (
225        "Consider adding {} to PATH or, if you prefer "
226        "to suppress this warning, use --no-warn-script-location."
227    )
228    if len(msg_lines) == 1:
229        msg_lines.append(last_line_fmt.format("this directory"))
230    else:
231        msg_lines.append(last_line_fmt.format("these directories"))
232
233    # Add a note if any directory starts with ~
234    warn_for_tilde = any(
235        i[0] == "~" for i in os.environ.get("PATH", "").split(os.pathsep) if i
236    )
237    if warn_for_tilde:
238        tilde_warning_msg = (
239            "NOTE: The current PATH contains path(s) starting with `~`, "
240            "which may not be expanded by all applications."
241        )
242        msg_lines.append(tilde_warning_msg)
243
244    # Returns the formatted multiline message
245    return "\n".join(msg_lines)
246
247
248def _normalized_outrows(outrows):
249    # type: (Iterable[InstalledCSVRow]) -> List[Tuple[str, str, str]]
250    """Normalize the given rows of a RECORD file.
251
252    Items in each row are converted into str. Rows are then sorted to make
253    the value more predictable for tests.
254
255    Each row is a 3-tuple (path, hash, size) and corresponds to a record of
256    a RECORD file (see PEP 376 and PEP 427 for details).  For the rows
257    passed to this function, the size can be an integer as an int or string,
258    or the empty string.
259    """
260    # Normally, there should only be one row per path, in which case the
261    # second and third elements don't come into play when sorting.
262    # However, in cases in the wild where a path might happen to occur twice,
263    # we don't want the sort operation to trigger an error (but still want
264    # determinism).  Since the third element can be an int or string, we
265    # coerce each element to a string to avoid a TypeError in this case.
266    # For additional background, see--
267    # https://github.com/pypa/pip/issues/5868
268    return sorted(
269        (ensure_str(record_path, encoding='utf-8'), hash_, str(size))
270        for record_path, hash_, size in outrows
271    )
272
273
274def _record_to_fs_path(record_path):
275    # type: (RecordPath) -> text_type
276    return record_path
277
278
279def _fs_to_record_path(path, relative_to=None):
280    # type: (text_type, Optional[text_type]) -> RecordPath
281    if relative_to is not None:
282        # On Windows, do not handle relative paths if they belong to different
283        # logical disks
284        if os.path.splitdrive(path)[0].lower() == \
285                os.path.splitdrive(relative_to)[0].lower():
286            path = os.path.relpath(path, relative_to)
287    path = path.replace(os.path.sep, '/')
288    return cast('RecordPath', path)
289
290
291def _parse_record_path(record_column):
292    # type: (str) -> RecordPath
293    p = ensure_text(record_column, encoding='utf-8')
294    return cast('RecordPath', p)
295
296
297def get_csv_rows_for_installed(
298    old_csv_rows,  # type: List[List[str]]
299    installed,  # type: Dict[RecordPath, RecordPath]
300    changed,  # type: Set[RecordPath]
301    generated,  # type: List[str]
302    lib_dir,  # type: str
303):
304    # type: (...) -> List[InstalledCSVRow]
305    """
306    :param installed: A map from archive RECORD path to installation RECORD
307        path.
308    """
309    installed_rows = []  # type: List[InstalledCSVRow]
310    for row in old_csv_rows:
311        if len(row) > 3:
312            logger.warning('RECORD line has more than three elements: %s', row)
313        old_record_path = _parse_record_path(row[0])
314        new_record_path = installed.pop(old_record_path, old_record_path)
315        if new_record_path in changed:
316            digest, length = rehash(_record_to_fs_path(new_record_path))
317        else:
318            digest = row[1] if len(row) > 1 else ''
319            length = row[2] if len(row) > 2 else ''
320        installed_rows.append((new_record_path, digest, length))
321    for f in generated:
322        path = _fs_to_record_path(f, lib_dir)
323        digest, length = rehash(f)
324        installed_rows.append((path, digest, length))
325    for installed_record_path in itervalues(installed):
326        installed_rows.append((installed_record_path, '', ''))
327    return installed_rows
328
329
330def get_console_script_specs(console):
331    # type: (Dict[str, str]) -> List[str]
332    """
333    Given the mapping from entrypoint name to callable, return the relevant
334    console script specs.
335    """
336    # Don't mutate caller's version
337    console = console.copy()
338
339    scripts_to_generate = []
340
341    # Special case pip and setuptools to generate versioned wrappers
342    #
343    # The issue is that some projects (specifically, pip and setuptools) use
344    # code in setup.py to create "versioned" entry points - pip2.7 on Python
345    # 2.7, pip3.3 on Python 3.3, etc. But these entry points are baked into
346    # the wheel metadata at build time, and so if the wheel is installed with
347    # a *different* version of Python the entry points will be wrong. The
348    # correct fix for this is to enhance the metadata to be able to describe
349    # such versioned entry points, but that won't happen till Metadata 2.0 is
350    # available.
351    # In the meantime, projects using versioned entry points will either have
352    # incorrect versioned entry points, or they will not be able to distribute
353    # "universal" wheels (i.e., they will need a wheel per Python version).
354    #
355    # Because setuptools and pip are bundled with _ensurepip and virtualenv,
356    # we need to use universal wheels. So, as a stopgap until Metadata 2.0, we
357    # override the versioned entry points in the wheel and generate the
358    # correct ones. This code is purely a short-term measure until Metadata 2.0
359    # is available.
360    #
361    # To add the level of hack in this section of code, in order to support
362    # ensurepip this code will look for an ``ENSUREPIP_OPTIONS`` environment
363    # variable which will control which version scripts get installed.
364    #
365    # ENSUREPIP_OPTIONS=altinstall
366    #   - Only pipX.Y and easy_install-X.Y will be generated and installed
367    # ENSUREPIP_OPTIONS=install
368    #   - pipX.Y, pipX, easy_install-X.Y will be generated and installed. Note
369    #     that this option is technically if ENSUREPIP_OPTIONS is set and is
370    #     not altinstall
371    # DEFAULT
372    #   - The default behavior is to install pip, pipX, pipX.Y, easy_install
373    #     and easy_install-X.Y.
374    pip_script = console.pop('pip', None)
375    if pip_script:
376        if "ENSUREPIP_OPTIONS" not in os.environ:
377            scripts_to_generate.append('pip = ' + pip_script)
378
379        if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall":
380            scripts_to_generate.append(
381                'pip{} = {}'.format(sys.version_info[0], pip_script)
382            )
383
384        scripts_to_generate.append(
385            'pip{} = {}'.format(get_major_minor_version(), pip_script)
386        )
387        # Delete any other versioned pip entry points
388        pip_ep = [k for k in console if re.match(r'pip(\d(\.\d)?)?$', k)]
389        for k in pip_ep:
390            del console[k]
391    easy_install_script = console.pop('easy_install', None)
392    if easy_install_script:
393        if "ENSUREPIP_OPTIONS" not in os.environ:
394            scripts_to_generate.append(
395                'easy_install = ' + easy_install_script
396            )
397
398        scripts_to_generate.append(
399            'easy_install-{} = {}'.format(
400                get_major_minor_version(), easy_install_script
401            )
402        )
403        # Delete any other versioned easy_install entry points
404        easy_install_ep = [
405            k for k in console if re.match(r'easy_install(-\d\.\d)?$', k)
406        ]
407        for k in easy_install_ep:
408            del console[k]
409
410    # Generate the console entry points specified in the wheel
411    scripts_to_generate.extend(starmap('{} = {}'.format, console.items()))
412
413    return scripts_to_generate
414
415
416class ZipBackedFile(object):
417    def __init__(self, src_record_path, dest_path, zip_file):
418        # type: (RecordPath, text_type, ZipFile) -> None
419        self.src_record_path = src_record_path
420        self.dest_path = dest_path
421        self._zip_file = zip_file
422        self.changed = False
423
424    def _getinfo(self):
425        # type: () -> ZipInfo
426        if not PY2:
427            return self._zip_file.getinfo(self.src_record_path)
428        # Python 2 does not expose a way to detect a ZIP's encoding, but the
429        # wheel specification (PEP 427) explicitly mandates that paths should
430        # use UTF-8, so we assume it is true.
431        return self._zip_file.getinfo(self.src_record_path.encode("utf-8"))
432
433    def save(self):
434        # type: () -> None
435        # directory creation is lazy and after file filtering
436        # to ensure we don't install empty dirs; empty dirs can't be
437        # uninstalled.
438        parent_dir = os.path.dirname(self.dest_path)
439        ensure_dir(parent_dir)
440
441        # When we open the output file below, any existing file is truncated
442        # before we start writing the new contents. This is fine in most
443        # cases, but can cause a segfault if pip has loaded a shared
444        # object (e.g. from pyopenssl through its vendored urllib3)
445        # Since the shared object is mmap'd an attempt to call a
446        # symbol in it will then cause a segfault. Unlinking the file
447        # allows writing of new contents while allowing the process to
448        # continue to use the old copy.
449        if os.path.exists(self.dest_path):
450            os.unlink(self.dest_path)
451
452        zipinfo = self._getinfo()
453
454        with self._zip_file.open(zipinfo) as f:
455            with open(self.dest_path, "wb") as dest:
456                shutil.copyfileobj(f, dest)
457
458        if zip_item_is_executable(zipinfo):
459            set_extracted_file_to_default_mode_plus_executable(self.dest_path)
460
461
462class ScriptFile(object):
463    def __init__(self, file):
464        # type: (File) -> None
465        self._file = file
466        self.src_record_path = self._file.src_record_path
467        self.dest_path = self._file.dest_path
468        self.changed = False
469
470    def save(self):
471        # type: () -> None
472        self._file.save()
473        self.changed = fix_script(self.dest_path)
474
475
476class MissingCallableSuffix(InstallationError):
477    def __init__(self, entry_point):
478        # type: (str) -> None
479        super(MissingCallableSuffix, self).__init__(
480            "Invalid script entry point: {} - A callable "
481            "suffix is required. Cf https://packaging.python.org/"
482            "specifications/entry-points/#use-for-scripts for more "
483            "information.".format(entry_point)
484        )
485
486
487def _raise_for_invalid_entrypoint(specification):
488    # type: (str) -> None
489    entry = get_export_entry(specification)
490    if entry is not None and entry.suffix is None:
491        raise MissingCallableSuffix(str(entry))
492
493
494class PipScriptMaker(ScriptMaker):
495    def make(self, specification, options=None):
496        # type: (str, Dict[str, Any]) -> List[str]
497        _raise_for_invalid_entrypoint(specification)
498        return super(PipScriptMaker, self).make(specification, options)
499
500
501def _install_wheel(
502    name,  # type: str
503    wheel_zip,  # type: ZipFile
504    wheel_path,  # type: str
505    scheme,  # type: Scheme
506    pycompile=True,  # type: bool
507    warn_script_location=True,  # type: bool
508    direct_url=None,  # type: Optional[DirectUrl]
509    requested=False,  # type: bool
510):
511    # type: (...) -> None
512    """Install a wheel.
513
514    :param name: Name of the project to install
515    :param wheel_zip: open ZipFile for wheel being installed
516    :param scheme: Distutils scheme dictating the install directories
517    :param req_description: String used in place of the requirement, for
518        logging
519    :param pycompile: Whether to byte-compile installed Python files
520    :param warn_script_location: Whether to check that scripts are installed
521        into a directory on PATH
522    :raises UnsupportedWheel:
523        * when the directory holds an unpacked wheel with incompatible
524          Wheel-Version
525        * when the .dist-info dir does not match the wheel
526    """
527    info_dir, metadata = parse_wheel(wheel_zip, name)
528
529    if wheel_root_is_purelib(metadata):
530        lib_dir = scheme.purelib
531    else:
532        lib_dir = scheme.platlib
533
534    # Record details of the files moved
535    #   installed = files copied from the wheel to the destination
536    #   changed = files changed while installing (scripts #! line typically)
537    #   generated = files newly generated during the install (script wrappers)
538    installed = {}  # type: Dict[RecordPath, RecordPath]
539    changed = set()  # type: Set[RecordPath]
540    generated = []  # type: List[str]
541
542    def record_installed(srcfile, destfile, modified=False):
543        # type: (RecordPath, text_type, bool) -> None
544        """Map archive RECORD paths to installation RECORD paths."""
545        newpath = _fs_to_record_path(destfile, lib_dir)
546        installed[srcfile] = newpath
547        if modified:
548            changed.add(_fs_to_record_path(destfile))
549
550    def all_paths():
551        # type: () -> Iterable[RecordPath]
552        names = wheel_zip.namelist()
553        # If a flag is set, names may be unicode in Python 2. We convert to
554        # text explicitly so these are valid for lookup in RECORD.
555        decoded_names = map(ensure_text, names)
556        for name in decoded_names:
557            yield cast("RecordPath", name)
558
559    def is_dir_path(path):
560        # type: (RecordPath) -> bool
561        return path.endswith("/")
562
563    def assert_no_path_traversal(dest_dir_path, target_path):
564        # type: (text_type, text_type) -> None
565        if not is_within_directory(dest_dir_path, target_path):
566            message = (
567                "The wheel {!r} has a file {!r} trying to install"
568                " outside the target directory {!r}"
569            )
570            raise InstallationError(
571                message.format(wheel_path, target_path, dest_dir_path)
572            )
573
574    def root_scheme_file_maker(zip_file, dest):
575        # type: (ZipFile, text_type) -> Callable[[RecordPath], File]
576        def make_root_scheme_file(record_path):
577            # type: (RecordPath) -> File
578            normed_path = os.path.normpath(record_path)
579            dest_path = os.path.join(dest, normed_path)
580            assert_no_path_traversal(dest, dest_path)
581            return ZipBackedFile(record_path, dest_path, zip_file)
582
583        return make_root_scheme_file
584
585    def data_scheme_file_maker(zip_file, scheme):
586        # type: (ZipFile, Scheme) -> Callable[[RecordPath], File]
587        scheme_paths = {}
588        for key in SCHEME_KEYS:
589            encoded_key = ensure_text(key)
590            scheme_paths[encoded_key] = ensure_text(
591                getattr(scheme, key), encoding=sys.getfilesystemencoding()
592            )
593
594        def make_data_scheme_file(record_path):
595            # type: (RecordPath) -> File
596            normed_path = os.path.normpath(record_path)
597            try:
598                _, scheme_key, dest_subpath = normed_path.split(os.path.sep, 2)
599            except ValueError:
600                message = (
601                    "Unexpected file in {}: {!r}. .data directory contents"
602                    " should be named like: '<scheme key>/<path>'."
603                ).format(wheel_path, record_path)
604                raise InstallationError(message)
605
606            try:
607                scheme_path = scheme_paths[scheme_key]
608            except KeyError:
609                valid_scheme_keys = ", ".join(sorted(scheme_paths))
610                message = (
611                    "Unknown scheme key used in {}: {} (for file {!r}). .data"
612                    " directory contents should be in subdirectories named"
613                    " with a valid scheme key ({})"
614                ).format(
615                    wheel_path, scheme_key, record_path, valid_scheme_keys
616                )
617                raise InstallationError(message)
618
619            dest_path = os.path.join(scheme_path, dest_subpath)
620            assert_no_path_traversal(scheme_path, dest_path)
621            return ZipBackedFile(record_path, dest_path, zip_file)
622
623        return make_data_scheme_file
624
625    def is_data_scheme_path(path):
626        # type: (RecordPath) -> bool
627        return path.split("/", 1)[0].endswith(".data")
628
629    paths = all_paths()
630    file_paths = filterfalse(is_dir_path, paths)
631    root_scheme_paths, data_scheme_paths = partition(
632        is_data_scheme_path, file_paths
633    )
634
635    make_root_scheme_file = root_scheme_file_maker(
636        wheel_zip,
637        ensure_text(lib_dir, encoding=sys.getfilesystemencoding()),
638    )
639    files = map(make_root_scheme_file, root_scheme_paths)
640
641    def is_script_scheme_path(path):
642        # type: (RecordPath) -> bool
643        parts = path.split("/", 2)
644        return (
645            len(parts) > 2 and
646            parts[0].endswith(".data") and
647            parts[1] == "scripts"
648        )
649
650    other_scheme_paths, script_scheme_paths = partition(
651        is_script_scheme_path, data_scheme_paths
652    )
653
654    make_data_scheme_file = data_scheme_file_maker(wheel_zip, scheme)
655    other_scheme_files = map(make_data_scheme_file, other_scheme_paths)
656    files = chain(files, other_scheme_files)
657
658    # Get the defined entry points
659    distribution = pkg_resources_distribution_for_wheel(
660        wheel_zip, name, wheel_path
661    )
662    console, gui = get_entrypoints(distribution)
663
664    def is_entrypoint_wrapper(file):
665        # type: (File) -> bool
666        # EP, EP.exe and EP-script.py are scripts generated for
667        # entry point EP by setuptools
668        path = file.dest_path
669        name = os.path.basename(path)
670        if name.lower().endswith('.exe'):
671            matchname = name[:-4]
672        elif name.lower().endswith('-script.py'):
673            matchname = name[:-10]
674        elif name.lower().endswith(".pya"):
675            matchname = name[:-4]
676        else:
677            matchname = name
678        # Ignore setuptools-generated scripts
679        return (matchname in console or matchname in gui)
680
681    script_scheme_files = map(make_data_scheme_file, script_scheme_paths)
682    script_scheme_files = filterfalse(
683        is_entrypoint_wrapper, script_scheme_files
684    )
685    script_scheme_files = map(ScriptFile, script_scheme_files)
686    files = chain(files, script_scheme_files)
687
688    for file in files:
689        file.save()
690        record_installed(file.src_record_path, file.dest_path, file.changed)
691
692    def pyc_source_file_paths():
693        # type: () -> Iterator[text_type]
694        # We de-duplicate installation paths, since there can be overlap (e.g.
695        # file in .data maps to same location as file in wheel root).
696        # Sorting installation paths makes it easier to reproduce and debug
697        # issues related to permissions on existing files.
698        for installed_path in sorted(set(installed.values())):
699            full_installed_path = os.path.join(lib_dir, installed_path)
700            if not os.path.isfile(full_installed_path):
701                continue
702            if not full_installed_path.endswith('.py'):
703                continue
704            yield full_installed_path
705
706    def pyc_output_path(path):
707        # type: (text_type) -> text_type
708        """Return the path the pyc file would have been written to.
709        """
710        if PY2:
711            if sys.flags.optimize:
712                return path + 'o'
713            else:
714                return path + 'c'
715        else:
716            return importlib.util.cache_from_source(path)
717
718    # Compile all of the pyc files for the installed files
719    if pycompile:
720        with captured_stdout() as stdout:
721            with warnings.catch_warnings():
722                warnings.filterwarnings('ignore')
723                for path in pyc_source_file_paths():
724                    # Python 2's `compileall.compile_file` requires a str in
725                    # error cases, so we must convert to the native type.
726                    path_arg = ensure_str(
727                        path, encoding=sys.getfilesystemencoding()
728                    )
729                    success = compileall.compile_file(
730                        path_arg, force=True, quiet=True
731                    )
732                    if success:
733                        pyc_path = pyc_output_path(path)
734                        assert os.path.exists(pyc_path)
735                        pyc_record_path = cast(
736                            "RecordPath", pyc_path.replace(os.path.sep, "/")
737                        )
738                        record_installed(pyc_record_path, pyc_path)
739        logger.debug(stdout.getvalue())
740
741    maker = PipScriptMaker(None, scheme.scripts)
742
743    # Ensure old scripts are overwritten.
744    # See https://github.com/pypa/pip/issues/1800
745    maker.clobber = True
746
747    # Ensure we don't generate any variants for scripts because this is almost
748    # never what somebody wants.
749    # See https://bitbucket.org/pypa/distlib/issue/35/
750    maker.variants = {''}
751
752    # This is required because otherwise distlib creates scripts that are not
753    # executable.
754    # See https://bitbucket.org/pypa/distlib/issue/32/
755    maker.set_mode = True
756
757    # Generate the console and GUI entry points specified in the wheel
758    scripts_to_generate = get_console_script_specs(console)
759
760    gui_scripts_to_generate = list(starmap('{} = {}'.format, gui.items()))
761
762    generated_console_scripts = maker.make_multiple(scripts_to_generate)
763    generated.extend(generated_console_scripts)
764
765    generated.extend(
766        maker.make_multiple(gui_scripts_to_generate, {'gui': True})
767    )
768
769    if warn_script_location:
770        msg = message_about_scripts_not_on_PATH(generated_console_scripts)
771        if msg is not None:
772            logger.warning(msg)
773
774    generated_file_mode = 0o666 & ~current_umask()
775
776    @contextlib.contextmanager
777    def _generate_file(path, **kwargs):
778        # type: (str, **Any) -> Iterator[NamedTemporaryFileResult]
779        with adjacent_tmp_file(path, **kwargs) as f:
780            yield f
781        os.chmod(f.name, generated_file_mode)
782        replace(f.name, path)
783
784    dest_info_dir = os.path.join(lib_dir, info_dir)
785
786    # Record pip as the installer
787    installer_path = os.path.join(dest_info_dir, 'INSTALLER')
788    with _generate_file(installer_path) as installer_file:
789        installer_file.write(b'pip\n')
790    generated.append(installer_path)
791
792    # Record the PEP 610 direct URL reference
793    if direct_url is not None:
794        direct_url_path = os.path.join(dest_info_dir, DIRECT_URL_METADATA_NAME)
795        with _generate_file(direct_url_path) as direct_url_file:
796            direct_url_file.write(direct_url.to_json().encode("utf-8"))
797        generated.append(direct_url_path)
798
799    # Record the REQUESTED file
800    if requested:
801        requested_path = os.path.join(dest_info_dir, 'REQUESTED')
802        with open(requested_path, "w"):
803            pass
804        generated.append(requested_path)
805
806    record_text = distribution.get_metadata('RECORD')
807    record_rows = list(csv.reader(record_text.splitlines()))
808
809    rows = get_csv_rows_for_installed(
810        record_rows,
811        installed=installed,
812        changed=changed,
813        generated=generated,
814        lib_dir=lib_dir)
815
816    # Record details of all files installed
817    record_path = os.path.join(dest_info_dir, 'RECORD')
818
819    with _generate_file(record_path, **csv_io_kwargs('w')) as record_file:
820        # The type mypy infers for record_file is different for Python 3
821        # (typing.IO[Any]) and Python 2 (typing.BinaryIO). We explicitly
822        # cast to typing.IO[str] as a workaround.
823        writer = csv.writer(cast('IO[str]', record_file))
824        writer.writerows(_normalized_outrows(rows))
825
826
827@contextlib.contextmanager
828def req_error_context(req_description):
829    # type: (str) -> Iterator[None]
830    try:
831        yield
832    except InstallationError as e:
833        message = "For req: {}. {}".format(req_description, e.args[0])
834        reraise(
835            InstallationError, InstallationError(message), sys.exc_info()[2]
836        )
837
838
839def install_wheel(
840    name,  # type: str
841    wheel_path,  # type: str
842    scheme,  # type: Scheme
843    req_description,  # type: str
844    pycompile=True,  # type: bool
845    warn_script_location=True,  # type: bool
846    direct_url=None,  # type: Optional[DirectUrl]
847    requested=False,  # type: bool
848):
849    # type: (...) -> None
850    with ZipFile(wheel_path, allowZip64=True) as z:
851        with req_error_context(req_description):
852            _install_wheel(
853                name=name,
854                wheel_zip=z,
855                wheel_path=wheel_path,
856                scheme=scheme,
857                pycompile=pycompile,
858                warn_script_location=warn_script_location,
859                direct_url=direct_url,
860                requested=requested,
861            )
862