1"""Support for installing and building the "wheel" binary package format.
2"""
3
4from __future__ import absolute_import
5
6import collections
7import compileall
8import contextlib
9import csv
10import importlib
11import logging
12import os.path
13import re
14import shutil
15import sys
16import warnings
17from base64 import urlsafe_b64encode
18from itertools import chain, starmap
19from zipfile import ZipFile
20
21from pip._vendor import pkg_resources
22from pip._vendor.distlib.scripts import ScriptMaker
23from pip._vendor.distlib.util import get_export_entry
24from pip._vendor.six import PY2, ensure_str, ensure_text, itervalues, reraise, text_type
25from pip._vendor.six.moves import filterfalse, map
26
27from pip._internal.exceptions import InstallationError
28from pip._internal.locations import get_major_minor_version
29from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, DirectUrl
30from pip._internal.models.scheme import SCHEME_KEYS
31from pip._internal.utils.filesystem import adjacent_tmp_file, replace
32from pip._internal.utils.misc import captured_stdout, ensure_dir, hash_file, partition
33from pip._internal.utils.typing import MYPY_CHECK_RUNNING
34from pip._internal.utils.unpacking import (
35    current_umask,
36    is_within_directory,
37    set_extracted_file_to_default_mode_plus_executable,
38    zip_item_is_executable,
39)
40from pip._internal.utils.wheel import parse_wheel, pkg_resources_distribution_for_wheel
41
42# Use the custom cast function at runtime to make cast work,
43# and import typing.cast when performing pre-commit and type
44# checks
45if not MYPY_CHECK_RUNNING:
46    from pip._internal.utils.typing import cast
47else:
48    from email.message import Message
49    from typing import (
50        IO,
51        Any,
52        Callable,
53        Dict,
54        Iterable,
55        Iterator,
56        List,
57        NewType,
58        Optional,
59        Protocol,
60        Sequence,
61        Set,
62        Tuple,
63        Union,
64        cast,
65    )
66    from zipfile import ZipInfo
67
68    from pip._vendor.pkg_resources import Distribution
69
70    from pip._internal.models.scheme import Scheme
71    from pip._internal.utils.filesystem import NamedTemporaryFileResult
72
73    RecordPath = NewType('RecordPath', text_type)
74    InstalledCSVRow = Tuple[RecordPath, str, Union[int, str]]
75
76    class File(Protocol):
77        src_record_path = None  # type: RecordPath
78        dest_path = None  # type: text_type
79        changed = None  # type: bool
80
81        def save(self):
82            # type: () -> None
83            pass
84
85
86logger = logging.getLogger(__name__)
87
88
89def rehash(path, blocksize=1 << 20):
90    # type: (text_type, int) -> Tuple[str, str]
91    """Return (encoded_digest, length) for path using hashlib.sha256()"""
92    h, length = hash_file(path, blocksize)
93    digest = 'sha256=' + urlsafe_b64encode(
94        h.digest()
95    ).decode('latin1').rstrip('=')
96    # unicode/str python2 issues
97    return (digest, str(length))  # type: ignore
98
99
100def csv_io_kwargs(mode):
101    # type: (str) -> Dict[str, Any]
102    """Return keyword arguments to properly open a CSV file
103    in the given mode.
104    """
105    if PY2:
106        return {'mode': '{}b'.format(mode)}
107    else:
108        return {'mode': mode, 'newline': '', 'encoding': 'utf-8'}
109
110
111def fix_script(path):
112    # type: (text_type) -> bool
113    """Replace #!python with #!/path/to/python
114    Return True if file was changed.
115    """
116    # XXX RECORD hashes will need to be updated
117    assert os.path.isfile(path)
118
119    with open(path, 'rb') as script:
120        firstline = script.readline()
121        if not firstline.startswith(b'#!python'):
122            return False
123        exename = sys.executable.encode(sys.getfilesystemencoding())
124        firstline = b'#!' + exename + os.linesep.encode("ascii")
125        rest = script.read()
126    with open(path, 'wb') as script:
127        script.write(firstline)
128        script.write(rest)
129    return True
130
131
132def wheel_root_is_purelib(metadata):
133    # type: (Message) -> bool
134    return metadata.get("Root-Is-Purelib", "").lower() == "true"
135
136
137def get_entrypoints(distribution):
138    # type: (Distribution) -> Tuple[Dict[str, str], Dict[str, str]]
139    # get the entry points and then the script names
140    try:
141        console = distribution.get_entry_map('console_scripts')
142        gui = distribution.get_entry_map('gui_scripts')
143    except KeyError:
144        # Our dict-based Distribution raises KeyError if entry_points.txt
145        # doesn't exist.
146        return {}, {}
147
148    def _split_ep(s):
149        # type: (pkg_resources.EntryPoint) -> Tuple[str, str]
150        """get the string representation of EntryPoint,
151        remove space and split on '='
152        """
153        split_parts = str(s).replace(" ", "").split("=")
154        return split_parts[0], split_parts[1]
155
156    # convert the EntryPoint objects into strings with module:function
157    console = dict(_split_ep(v) for v in console.values())
158    gui = dict(_split_ep(v) for v in gui.values())
159    return console, gui
160
161
162def message_about_scripts_not_on_PATH(scripts):
163    # type: (Sequence[str]) -> Optional[str]
164    """Determine if any scripts are not on PATH and format a warning.
165    Returns a warning message if one or more scripts are not on PATH,
166    otherwise None.
167    """
168    if not scripts:
169        return None
170
171    # Group scripts by the path they were installed in
172    grouped_by_dir = collections.defaultdict(set)  # type: Dict[str, Set[str]]
173    for destfile in scripts:
174        parent_dir = os.path.dirname(destfile)
175        script_name = os.path.basename(destfile)
176        grouped_by_dir[parent_dir].add(script_name)
177
178    # We don't want to warn for directories that are on PATH.
179    not_warn_dirs = [
180        os.path.normcase(i).rstrip(os.sep) for i in
181        os.environ.get("PATH", "").split(os.pathsep)
182    ]
183    # If an executable sits with sys.executable, we don't warn for it.
184    #     This covers the case of venv invocations without activating the venv.
185    not_warn_dirs.append(os.path.normcase(os.path.dirname(sys.executable)))
186    warn_for = {
187        parent_dir: scripts for parent_dir, scripts in grouped_by_dir.items()
188        if os.path.normcase(parent_dir) not in not_warn_dirs
189    }  # type: Dict[str, Set[str]]
190    if not warn_for:
191        return None
192
193    # Format a message
194    msg_lines = []
195    for parent_dir, dir_scripts in warn_for.items():
196        sorted_scripts = sorted(dir_scripts)  # type: List[str]
197        if len(sorted_scripts) == 1:
198            start_text = "script {} is".format(sorted_scripts[0])
199        else:
200            start_text = "scripts {} are".format(
201                ", ".join(sorted_scripts[:-1]) + " and " + sorted_scripts[-1]
202            )
203
204        msg_lines.append(
205            "The {} installed in '{}' which is not on PATH."
206            .format(start_text, parent_dir)
207        )
208
209    last_line_fmt = (
210        "Consider adding {} to PATH or, if you prefer "
211        "to suppress this warning, use --no-warn-script-location."
212    )
213    if len(msg_lines) == 1:
214        msg_lines.append(last_line_fmt.format("this directory"))
215    else:
216        msg_lines.append(last_line_fmt.format("these directories"))
217
218    # Add a note if any directory starts with ~
219    warn_for_tilde = any(
220        i[0] == "~" for i in os.environ.get("PATH", "").split(os.pathsep) if i
221    )
222    if warn_for_tilde:
223        tilde_warning_msg = (
224            "NOTE: The current PATH contains path(s) starting with `~`, "
225            "which may not be expanded by all applications."
226        )
227        msg_lines.append(tilde_warning_msg)
228
229    # Returns the formatted multiline message
230    return "\n".join(msg_lines)
231
232
233def _normalized_outrows(outrows):
234    # type: (Iterable[InstalledCSVRow]) -> List[Tuple[str, str, str]]
235    """Normalize the given rows of a RECORD file.
236
237    Items in each row are converted into str. Rows are then sorted to make
238    the value more predictable for tests.
239
240    Each row is a 3-tuple (path, hash, size) and corresponds to a record of
241    a RECORD file (see PEP 376 and PEP 427 for details).  For the rows
242    passed to this function, the size can be an integer as an int or string,
243    or the empty string.
244    """
245    # Normally, there should only be one row per path, in which case the
246    # second and third elements don't come into play when sorting.
247    # However, in cases in the wild where a path might happen to occur twice,
248    # we don't want the sort operation to trigger an error (but still want
249    # determinism).  Since the third element can be an int or string, we
250    # coerce each element to a string to avoid a TypeError in this case.
251    # For additional background, see--
252    # https://github.com/pypa/pip/issues/5868
253    return sorted(
254        (ensure_str(record_path, encoding='utf-8'), hash_, str(size))
255        for record_path, hash_, size in outrows
256    )
257
258
259def _record_to_fs_path(record_path):
260    # type: (RecordPath) -> text_type
261    return record_path
262
263
264def _fs_to_record_path(path, relative_to=None):
265    # type: (text_type, Optional[text_type]) -> RecordPath
266    if relative_to is not None:
267        # On Windows, do not handle relative paths if they belong to different
268        # logical disks
269        if os.path.splitdrive(path)[0].lower() == \
270                os.path.splitdrive(relative_to)[0].lower():
271            path = os.path.relpath(path, relative_to)
272    path = path.replace(os.path.sep, '/')
273    return cast('RecordPath', path)
274
275
276def _parse_record_path(record_column):
277    # type: (str) -> RecordPath
278    p = ensure_text(record_column, encoding='utf-8')
279    return cast('RecordPath', p)
280
281
282def get_csv_rows_for_installed(
283    old_csv_rows,  # type: List[List[str]]
284    installed,  # type: Dict[RecordPath, RecordPath]
285    changed,  # type: Set[RecordPath]
286    generated,  # type: List[str]
287    lib_dir,  # type: str
288):
289    # type: (...) -> List[InstalledCSVRow]
290    """
291    :param installed: A map from archive RECORD path to installation RECORD
292        path.
293    """
294    installed_rows = []  # type: List[InstalledCSVRow]
295    for row in old_csv_rows:
296        if len(row) > 3:
297            logger.warning('RECORD line has more than three elements: %s', row)
298        old_record_path = _parse_record_path(row[0])
299        new_record_path = installed.pop(old_record_path, old_record_path)
300        if new_record_path in changed:
301            digest, length = rehash(_record_to_fs_path(new_record_path))
302        else:
303            digest = row[1] if len(row) > 1 else ''
304            length = row[2] if len(row) > 2 else ''
305        installed_rows.append((new_record_path, digest, length))
306    for f in generated:
307        path = _fs_to_record_path(f, lib_dir)
308        digest, length = rehash(f)
309        installed_rows.append((path, digest, length))
310    for installed_record_path in itervalues(installed):
311        installed_rows.append((installed_record_path, '', ''))
312    return installed_rows
313
314
315def get_console_script_specs(console):
316    # type: (Dict[str, str]) -> List[str]
317    """
318    Given the mapping from entrypoint name to callable, return the relevant
319    console script specs.
320    """
321    # Don't mutate caller's version
322    console = console.copy()
323
324    scripts_to_generate = []
325
326    # Special case pip and setuptools to generate versioned wrappers
327    #
328    # The issue is that some projects (specifically, pip and setuptools) use
329    # code in setup.py to create "versioned" entry points - pip2.7 on Python
330    # 2.7, pip3.3 on Python 3.3, etc. But these entry points are baked into
331    # the wheel metadata at build time, and so if the wheel is installed with
332    # a *different* version of Python the entry points will be wrong. The
333    # correct fix for this is to enhance the metadata to be able to describe
334    # such versioned entry points, but that won't happen till Metadata 2.0 is
335    # available.
336    # In the meantime, projects using versioned entry points will either have
337    # incorrect versioned entry points, or they will not be able to distribute
338    # "universal" wheels (i.e., they will need a wheel per Python version).
339    #
340    # Because setuptools and pip are bundled with _ensurepip and virtualenv,
341    # we need to use universal wheels. So, as a stopgap until Metadata 2.0, we
342    # override the versioned entry points in the wheel and generate the
343    # correct ones. This code is purely a short-term measure until Metadata 2.0
344    # is available.
345    #
346    # To add the level of hack in this section of code, in order to support
347    # ensurepip this code will look for an ``ENSUREPIP_OPTIONS`` environment
348    # variable which will control which version scripts get installed.
349    #
350    # ENSUREPIP_OPTIONS=altinstall
351    #   - Only pipX.Y and easy_install-X.Y will be generated and installed
352    # ENSUREPIP_OPTIONS=install
353    #   - pipX.Y, pipX, easy_install-X.Y will be generated and installed. Note
354    #     that this option is technically if ENSUREPIP_OPTIONS is set and is
355    #     not altinstall
356    # DEFAULT
357    #   - The default behavior is to install pip, pipX, pipX.Y, easy_install
358    #     and easy_install-X.Y.
359    pip_script = console.pop('pip', None)
360    if pip_script:
361        if "ENSUREPIP_OPTIONS" not in os.environ:
362            scripts_to_generate.append('pip = ' + pip_script)
363
364        if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall":
365            scripts_to_generate.append(
366                'pip{} = {}'.format(sys.version_info[0], pip_script)
367            )
368
369        scripts_to_generate.append(
370            'pip{} = {}'.format(get_major_minor_version(), pip_script)
371        )
372        # Delete any other versioned pip entry points
373        pip_ep = [k for k in console if re.match(r'pip(\d(\.\d)?)?$', k)]
374        for k in pip_ep:
375            del console[k]
376    easy_install_script = console.pop('easy_install', None)
377    if easy_install_script:
378        if "ENSUREPIP_OPTIONS" not in os.environ:
379            scripts_to_generate.append(
380                'easy_install = ' + easy_install_script
381            )
382
383        scripts_to_generate.append(
384            'easy_install-{} = {}'.format(
385                get_major_minor_version(), easy_install_script
386            )
387        )
388        # Delete any other versioned easy_install entry points
389        easy_install_ep = [
390            k for k in console if re.match(r'easy_install(-\d\.\d)?$', k)
391        ]
392        for k in easy_install_ep:
393            del console[k]
394
395    # Generate the console entry points specified in the wheel
396    scripts_to_generate.extend(starmap('{} = {}'.format, console.items()))
397
398    return scripts_to_generate
399
400
401class ZipBackedFile(object):
402    def __init__(self, src_record_path, dest_path, zip_file):
403        # type: (RecordPath, text_type, ZipFile) -> None
404        self.src_record_path = src_record_path
405        self.dest_path = dest_path
406        self._zip_file = zip_file
407        self.changed = False
408
409    def _getinfo(self):
410        # type: () -> ZipInfo
411        if not PY2:
412            return self._zip_file.getinfo(self.src_record_path)
413        # Python 2 does not expose a way to detect a ZIP's encoding, but the
414        # wheel specification (PEP 427) explicitly mandates that paths should
415        # use UTF-8, so we assume it is true.
416        return self._zip_file.getinfo(self.src_record_path.encode("utf-8"))
417
418    def save(self):
419        # type: () -> None
420        # directory creation is lazy and after file filtering
421        # to ensure we don't install empty dirs; empty dirs can't be
422        # uninstalled.
423        parent_dir = os.path.dirname(self.dest_path)
424        ensure_dir(parent_dir)
425
426        # When we open the output file below, any existing file is truncated
427        # before we start writing the new contents. This is fine in most
428        # cases, but can cause a segfault if pip has loaded a shared
429        # object (e.g. from pyopenssl through its vendored urllib3)
430        # Since the shared object is mmap'd an attempt to call a
431        # symbol in it will then cause a segfault. Unlinking the file
432        # allows writing of new contents while allowing the process to
433        # continue to use the old copy.
434        if os.path.exists(self.dest_path):
435            os.unlink(self.dest_path)
436
437        zipinfo = self._getinfo()
438
439        with self._zip_file.open(zipinfo) as f:
440            with open(self.dest_path, "wb") as dest:
441                shutil.copyfileobj(f, dest)
442
443        if zip_item_is_executable(zipinfo):
444            set_extracted_file_to_default_mode_plus_executable(self.dest_path)
445
446
447class ScriptFile(object):
448    def __init__(self, file):
449        # type: (File) -> None
450        self._file = file
451        self.src_record_path = self._file.src_record_path
452        self.dest_path = self._file.dest_path
453        self.changed = False
454
455    def save(self):
456        # type: () -> None
457        self._file.save()
458        self.changed = fix_script(self.dest_path)
459
460
461class MissingCallableSuffix(InstallationError):
462    def __init__(self, entry_point):
463        # type: (str) -> None
464        super(MissingCallableSuffix, self).__init__(
465            "Invalid script entry point: {} - A callable "
466            "suffix is required. Cf https://packaging.python.org/"
467            "specifications/entry-points/#use-for-scripts for more "
468            "information.".format(entry_point)
469        )
470
471
472def _raise_for_invalid_entrypoint(specification):
473    # type: (str) -> None
474    entry = get_export_entry(specification)
475    if entry is not None and entry.suffix is None:
476        raise MissingCallableSuffix(str(entry))
477
478
479class PipScriptMaker(ScriptMaker):
480    def make(self, specification, options=None):
481        # type: (str, Dict[str, Any]) -> List[str]
482        _raise_for_invalid_entrypoint(specification)
483        return super(PipScriptMaker, self).make(specification, options)
484
485
486def _install_wheel(
487    name,  # type: str
488    wheel_zip,  # type: ZipFile
489    wheel_path,  # type: str
490    scheme,  # type: Scheme
491    pycompile=True,  # type: bool
492    warn_script_location=True,  # type: bool
493    direct_url=None,  # type: Optional[DirectUrl]
494    requested=False,  # type: bool
495):
496    # type: (...) -> None
497    """Install a wheel.
498
499    :param name: Name of the project to install
500    :param wheel_zip: open ZipFile for wheel being installed
501    :param scheme: Distutils scheme dictating the install directories
502    :param req_description: String used in place of the requirement, for
503        logging
504    :param pycompile: Whether to byte-compile installed Python files
505    :param warn_script_location: Whether to check that scripts are installed
506        into a directory on PATH
507    :raises UnsupportedWheel:
508        * when the directory holds an unpacked wheel with incompatible
509          Wheel-Version
510        * when the .dist-info dir does not match the wheel
511    """
512    info_dir, metadata = parse_wheel(wheel_zip, name)
513
514    if wheel_root_is_purelib(metadata):
515        lib_dir = scheme.purelib
516    else:
517        lib_dir = scheme.platlib
518
519    # Record details of the files moved
520    #   installed = files copied from the wheel to the destination
521    #   changed = files changed while installing (scripts #! line typically)
522    #   generated = files newly generated during the install (script wrappers)
523    installed = {}  # type: Dict[RecordPath, RecordPath]
524    changed = set()  # type: Set[RecordPath]
525    generated = []  # type: List[str]
526
527    def record_installed(srcfile, destfile, modified=False):
528        # type: (RecordPath, text_type, bool) -> None
529        """Map archive RECORD paths to installation RECORD paths."""
530        newpath = _fs_to_record_path(destfile, lib_dir)
531        installed[srcfile] = newpath
532        if modified:
533            changed.add(_fs_to_record_path(destfile))
534
535    def all_paths():
536        # type: () -> Iterable[RecordPath]
537        names = wheel_zip.namelist()
538        # If a flag is set, names may be unicode in Python 2. We convert to
539        # text explicitly so these are valid for lookup in RECORD.
540        decoded_names = map(ensure_text, names)
541        for name in decoded_names:
542            yield cast("RecordPath", name)
543
544    def is_dir_path(path):
545        # type: (RecordPath) -> bool
546        return path.endswith("/")
547
548    def assert_no_path_traversal(dest_dir_path, target_path):
549        # type: (text_type, text_type) -> None
550        if not is_within_directory(dest_dir_path, target_path):
551            message = (
552                "The wheel {!r} has a file {!r} trying to install"
553                " outside the target directory {!r}"
554            )
555            raise InstallationError(
556                message.format(wheel_path, target_path, dest_dir_path)
557            )
558
559    def root_scheme_file_maker(zip_file, dest):
560        # type: (ZipFile, text_type) -> Callable[[RecordPath], File]
561        def make_root_scheme_file(record_path):
562            # type: (RecordPath) -> File
563            normed_path = os.path.normpath(record_path)
564            dest_path = os.path.join(dest, normed_path)
565            assert_no_path_traversal(dest, dest_path)
566            return ZipBackedFile(record_path, dest_path, zip_file)
567
568        return make_root_scheme_file
569
570    def data_scheme_file_maker(zip_file, scheme):
571        # type: (ZipFile, Scheme) -> Callable[[RecordPath], File]
572        scheme_paths = {}
573        for key in SCHEME_KEYS:
574            encoded_key = ensure_text(key)
575            scheme_paths[encoded_key] = ensure_text(
576                getattr(scheme, key), encoding=sys.getfilesystemencoding()
577            )
578
579        def make_data_scheme_file(record_path):
580            # type: (RecordPath) -> File
581            normed_path = os.path.normpath(record_path)
582            try:
583                _, scheme_key, dest_subpath = normed_path.split(os.path.sep, 2)
584            except ValueError:
585                message = (
586                    "Unexpected file in {}: {!r}. .data directory contents"
587                    " should be named like: '<scheme key>/<path>'."
588                ).format(wheel_path, record_path)
589                raise InstallationError(message)
590
591            try:
592                scheme_path = scheme_paths[scheme_key]
593            except KeyError:
594                valid_scheme_keys = ", ".join(sorted(scheme_paths))
595                message = (
596                    "Unknown scheme key used in {}: {} (for file {!r}). .data"
597                    " directory contents should be in subdirectories named"
598                    " with a valid scheme key ({})"
599                ).format(
600                    wheel_path, scheme_key, record_path, valid_scheme_keys
601                )
602                raise InstallationError(message)
603
604            dest_path = os.path.join(scheme_path, dest_subpath)
605            assert_no_path_traversal(scheme_path, dest_path)
606            return ZipBackedFile(record_path, dest_path, zip_file)
607
608        return make_data_scheme_file
609
610    def is_data_scheme_path(path):
611        # type: (RecordPath) -> bool
612        return path.split("/", 1)[0].endswith(".data")
613
614    paths = all_paths()
615    file_paths = filterfalse(is_dir_path, paths)
616    root_scheme_paths, data_scheme_paths = partition(
617        is_data_scheme_path, file_paths
618    )
619
620    make_root_scheme_file = root_scheme_file_maker(
621        wheel_zip,
622        ensure_text(lib_dir, encoding=sys.getfilesystemencoding()),
623    )
624    files = map(make_root_scheme_file, root_scheme_paths)
625
626    def is_script_scheme_path(path):
627        # type: (RecordPath) -> bool
628        parts = path.split("/", 2)
629        return (
630            len(parts) > 2 and
631            parts[0].endswith(".data") and
632            parts[1] == "scripts"
633        )
634
635    other_scheme_paths, script_scheme_paths = partition(
636        is_script_scheme_path, data_scheme_paths
637    )
638
639    make_data_scheme_file = data_scheme_file_maker(wheel_zip, scheme)
640    other_scheme_files = map(make_data_scheme_file, other_scheme_paths)
641    files = chain(files, other_scheme_files)
642
643    # Get the defined entry points
644    distribution = pkg_resources_distribution_for_wheel(
645        wheel_zip, name, wheel_path
646    )
647    console, gui = get_entrypoints(distribution)
648
649    def is_entrypoint_wrapper(file):
650        # type: (File) -> bool
651        # EP, EP.exe and EP-script.py are scripts generated for
652        # entry point EP by setuptools
653        path = file.dest_path
654        name = os.path.basename(path)
655        if name.lower().endswith('.exe'):
656            matchname = name[:-4]
657        elif name.lower().endswith('-script.py'):
658            matchname = name[:-10]
659        elif name.lower().endswith(".pya"):
660            matchname = name[:-4]
661        else:
662            matchname = name
663        # Ignore setuptools-generated scripts
664        return (matchname in console or matchname in gui)
665
666    script_scheme_files = map(make_data_scheme_file, script_scheme_paths)
667    script_scheme_files = filterfalse(
668        is_entrypoint_wrapper, script_scheme_files
669    )
670    script_scheme_files = map(ScriptFile, script_scheme_files)
671    files = chain(files, script_scheme_files)
672
673    for file in files:
674        file.save()
675        record_installed(file.src_record_path, file.dest_path, file.changed)
676
677    def pyc_source_file_paths():
678        # type: () -> Iterator[text_type]
679        # We de-duplicate installation paths, since there can be overlap (e.g.
680        # file in .data maps to same location as file in wheel root).
681        # Sorting installation paths makes it easier to reproduce and debug
682        # issues related to permissions on existing files.
683        for installed_path in sorted(set(installed.values())):
684            full_installed_path = os.path.join(lib_dir, installed_path)
685            if not os.path.isfile(full_installed_path):
686                continue
687            if not full_installed_path.endswith('.py'):
688                continue
689            yield full_installed_path
690
691    def pyc_output_path(path):
692        # type: (text_type) -> text_type
693        """Return the path the pyc file would have been written to.
694        """
695        if PY2:
696            if sys.flags.optimize:
697                return path + 'o'
698            else:
699                return path + 'c'
700        else:
701            return importlib.util.cache_from_source(path)
702
703    # Compile all of the pyc files for the installed files
704    if pycompile:
705        with captured_stdout() as stdout:
706            with warnings.catch_warnings():
707                warnings.filterwarnings('ignore')
708                for path in pyc_source_file_paths():
709                    # Python 2's `compileall.compile_file` requires a str in
710                    # error cases, so we must convert to the native type.
711                    path_arg = ensure_str(
712                        path, encoding=sys.getfilesystemencoding()
713                    )
714                    success = compileall.compile_file(
715                        path_arg, force=True, quiet=True
716                    )
717                    if success:
718                        pyc_path = pyc_output_path(path)
719                        assert os.path.exists(pyc_path)
720                        pyc_record_path = cast(
721                            "RecordPath", pyc_path.replace(os.path.sep, "/")
722                        )
723                        record_installed(pyc_record_path, pyc_path)
724        logger.debug(stdout.getvalue())
725
726    maker = PipScriptMaker(None, scheme.scripts)
727
728    # Ensure old scripts are overwritten.
729    # See https://github.com/pypa/pip/issues/1800
730    maker.clobber = True
731
732    # Ensure we don't generate any variants for scripts because this is almost
733    # never what somebody wants.
734    # See https://bitbucket.org/pypa/distlib/issue/35/
735    maker.variants = {''}
736
737    # This is required because otherwise distlib creates scripts that are not
738    # executable.
739    # See https://bitbucket.org/pypa/distlib/issue/32/
740    maker.set_mode = True
741
742    # Generate the console and GUI entry points specified in the wheel
743    scripts_to_generate = get_console_script_specs(console)
744
745    gui_scripts_to_generate = list(starmap('{} = {}'.format, gui.items()))
746
747    generated_console_scripts = maker.make_multiple(scripts_to_generate)
748    generated.extend(generated_console_scripts)
749
750    generated.extend(
751        maker.make_multiple(gui_scripts_to_generate, {'gui': True})
752    )
753
754    if warn_script_location:
755        msg = message_about_scripts_not_on_PATH(generated_console_scripts)
756        if msg is not None:
757            logger.warning(msg)
758
759    generated_file_mode = 0o666 & ~current_umask()
760
761    @contextlib.contextmanager
762    def _generate_file(path, **kwargs):
763        # type: (str, **Any) -> Iterator[NamedTemporaryFileResult]
764        with adjacent_tmp_file(path, **kwargs) as f:
765            yield f
766        os.chmod(f.name, generated_file_mode)
767        replace(f.name, path)
768
769    dest_info_dir = os.path.join(lib_dir, info_dir)
770
771    # Record pip as the installer
772    installer_path = os.path.join(dest_info_dir, 'INSTALLER')
773    with _generate_file(installer_path) as installer_file:
774        installer_file.write(b'pip\n')
775    generated.append(installer_path)
776
777    # Record the PEP 610 direct URL reference
778    if direct_url is not None:
779        direct_url_path = os.path.join(dest_info_dir, DIRECT_URL_METADATA_NAME)
780        with _generate_file(direct_url_path) as direct_url_file:
781            direct_url_file.write(direct_url.to_json().encode("utf-8"))
782        generated.append(direct_url_path)
783
784    # Record the REQUESTED file
785    if requested:
786        requested_path = os.path.join(dest_info_dir, 'REQUESTED')
787        with open(requested_path, "w"):
788            pass
789        generated.append(requested_path)
790
791    record_text = distribution.get_metadata('RECORD')
792    record_rows = list(csv.reader(record_text.splitlines()))
793
794    rows = get_csv_rows_for_installed(
795        record_rows,
796        installed=installed,
797        changed=changed,
798        generated=generated,
799        lib_dir=lib_dir)
800
801    # Record details of all files installed
802    record_path = os.path.join(dest_info_dir, 'RECORD')
803
804    with _generate_file(record_path, **csv_io_kwargs('w')) as record_file:
805        # The type mypy infers for record_file is different for Python 3
806        # (typing.IO[Any]) and Python 2 (typing.BinaryIO). We explicitly
807        # cast to typing.IO[str] as a workaround.
808        writer = csv.writer(cast('IO[str]', record_file))
809        writer.writerows(_normalized_outrows(rows))
810
811
812@contextlib.contextmanager
813def req_error_context(req_description):
814    # type: (str) -> Iterator[None]
815    try:
816        yield
817    except InstallationError as e:
818        message = "For req: {}. {}".format(req_description, e.args[0])
819        reraise(
820            InstallationError, InstallationError(message), sys.exc_info()[2]
821        )
822
823
824def install_wheel(
825    name,  # type: str
826    wheel_path,  # type: str
827    scheme,  # type: Scheme
828    req_description,  # type: str
829    pycompile=True,  # type: bool
830    warn_script_location=True,  # type: bool
831    direct_url=None,  # type: Optional[DirectUrl]
832    requested=False,  # type: bool
833):
834    # type: (...) -> None
835    with ZipFile(wheel_path, allowZip64=True) as z:
836        with req_error_context(req_description):
837            _install_wheel(
838                name=name,
839                wheel_zip=z,
840                wheel_path=wheel_path,
841                scheme=scheme,
842                pycompile=pycompile,
843                warn_script_location=warn_script_location,
844                direct_url=direct_url,
845                requested=requested,
846            )
847