1"""Support for installing and building the "wheel" binary package format.
2"""
3
4# The following comment should be removed at some point in the future.
5# mypy: strict-optional=False
6
7from __future__ import absolute_import
8
9import collections
10import compileall
11import csv
12import logging
13import os.path
14import re
15import shutil
16import stat
17import sys
18import warnings
19from base64 import urlsafe_b64encode
20from zipfile import ZipFile
21
22from pipenv.patched.notpip._vendor import pkg_resources
23from pipenv.patched.notpip._vendor.distlib.scripts import ScriptMaker
24from pipenv.patched.notpip._vendor.distlib.util import get_export_entry
25from pipenv.patched.notpip._vendor.six import StringIO
26
27from pipenv.patched.notpip._internal.exceptions import InstallationError
28from pipenv.patched.notpip._internal.locations import get_major_minor_version
29from pipenv.patched.notpip._internal.utils.misc import captured_stdout, ensure_dir, hash_file
30from pipenv.patched.notpip._internal.utils.temp_dir import TempDirectory
31from pipenv.patched.notpip._internal.utils.typing import MYPY_CHECK_RUNNING
32from pipenv.patched.notpip._internal.utils.unpacking import unpack_file
33from pipenv.patched.notpip._internal.utils.wheel import parse_wheel
34
35if MYPY_CHECK_RUNNING:
36    from email.message import Message
37    from typing import (
38        Dict, List, Optional, Sequence, Tuple, IO, Text, Any,
39        Iterable, Callable, Set,
40    )
41
42    from pipenv.patched.notpip._internal.models.scheme import Scheme
43
44    InstalledCSVRow = Tuple[str, ...]
45
46
47logger = logging.getLogger(__name__)
48
49
50def normpath(src, p):
51    # type: (str, str) -> str
52    return os.path.relpath(src, p).replace(os.path.sep, '/')
53
54
55def rehash(path, blocksize=1 << 20):
56    # type: (str, int) -> Tuple[str, str]
57    """Return (encoded_digest, length) for path using hashlib.sha256()"""
58    h, length = hash_file(path, blocksize)
59    digest = 'sha256=' + urlsafe_b64encode(
60        h.digest()
61    ).decode('latin1').rstrip('=')
62    # unicode/str python2 issues
63    return (digest, str(length))  # type: ignore
64
65
66def open_for_csv(name, mode):
67    # type: (str, Text) -> IO[Any]
68    if sys.version_info[0] < 3:
69        nl = {}  # type: Dict[str, Any]
70        bin = 'b'
71    else:
72        nl = {'newline': ''}  # type: Dict[str, Any]
73        bin = ''
74    return open(name, mode + bin, **nl)
75
76
77def fix_script(path):
78    # type: (str) -> Optional[bool]
79    """Replace #!python with #!/path/to/python
80    Return True if file was changed.
81    """
82    # XXX RECORD hashes will need to be updated
83    if os.path.isfile(path):
84        with open(path, 'rb') as script:
85            firstline = script.readline()
86            if not firstline.startswith(b'#!python'):
87                return False
88            exename = sys.executable.encode(sys.getfilesystemencoding())
89            firstline = b'#!' + exename + os.linesep.encode("ascii")
90            rest = script.read()
91        with open(path, 'wb') as script:
92            script.write(firstline)
93            script.write(rest)
94        return True
95    return None
96
97
98def wheel_root_is_purelib(metadata):
99    # type: (Message) -> bool
100    return metadata.get("Root-Is-Purelib", "").lower() == "true"
101
102
103def get_entrypoints(filename):
104    # type: (str) -> Tuple[Dict[str, str], Dict[str, str]]
105    if not os.path.exists(filename):
106        return {}, {}
107
108    # This is done because you can pass a string to entry_points wrappers which
109    # means that they may or may not be valid INI files. The attempt here is to
110    # strip leading and trailing whitespace in order to make them valid INI
111    # files.
112    with open(filename) as fp:
113        data = StringIO()
114        for line in fp:
115            data.write(line.strip())
116            data.write("\n")
117        data.seek(0)
118
119    # get the entry points and then the script names
120    entry_points = pkg_resources.EntryPoint.parse_map(data)
121    console = entry_points.get('console_scripts', {})
122    gui = entry_points.get('gui_scripts', {})
123
124    def _split_ep(s):
125        # type: (pkg_resources.EntryPoint) -> Tuple[str, str]
126        """get the string representation of EntryPoint,
127        remove space and split on '='
128        """
129        split_parts = str(s).replace(" ", "").split("=")
130        return split_parts[0], split_parts[1]
131
132    # convert the EntryPoint objects into strings with module:function
133    console = dict(_split_ep(v) for v in console.values())
134    gui = dict(_split_ep(v) for v in gui.values())
135    return console, gui
136
137
138def message_about_scripts_not_on_PATH(scripts):
139    # type: (Sequence[str]) -> Optional[str]
140    """Determine if any scripts are not on PATH and format a warning.
141    Returns a warning message if one or more scripts are not on PATH,
142    otherwise None.
143    """
144    if not scripts:
145        return None
146
147    # Group scripts by the path they were installed in
148    grouped_by_dir = collections.defaultdict(set)  # type: Dict[str, Set[str]]
149    for destfile in scripts:
150        parent_dir = os.path.dirname(destfile)
151        script_name = os.path.basename(destfile)
152        grouped_by_dir[parent_dir].add(script_name)
153
154    # We don't want to warn for directories that are on PATH.
155    not_warn_dirs = [
156        os.path.normcase(i).rstrip(os.sep) for i in
157        os.environ.get("PATH", "").split(os.pathsep)
158    ]
159    # If an executable sits with sys.executable, we don't warn for it.
160    #     This covers the case of venv invocations without activating the venv.
161    not_warn_dirs.append(os.path.normcase(os.path.dirname(sys.executable)))
162    warn_for = {
163        parent_dir: scripts for parent_dir, scripts in grouped_by_dir.items()
164        if os.path.normcase(parent_dir) not in not_warn_dirs
165    }  # type: Dict[str, Set[str]]
166    if not warn_for:
167        return None
168
169    # Format a message
170    msg_lines = []
171    for parent_dir, dir_scripts in warn_for.items():
172        sorted_scripts = sorted(dir_scripts)  # type: List[str]
173        if len(sorted_scripts) == 1:
174            start_text = "script {} is".format(sorted_scripts[0])
175        else:
176            start_text = "scripts {} are".format(
177                ", ".join(sorted_scripts[:-1]) + " and " + sorted_scripts[-1]
178            )
179
180        msg_lines.append(
181            "The {} installed in '{}' which is not on PATH."
182            .format(start_text, parent_dir)
183        )
184
185    last_line_fmt = (
186        "Consider adding {} to PATH or, if you prefer "
187        "to suppress this warning, use --no-warn-script-location."
188    )
189    if len(msg_lines) == 1:
190        msg_lines.append(last_line_fmt.format("this directory"))
191    else:
192        msg_lines.append(last_line_fmt.format("these directories"))
193
194    # Add a note if any directory starts with ~
195    warn_for_tilde = any(
196        i[0] == "~" for i in os.environ.get("PATH", "").split(os.pathsep) if i
197    )
198    if warn_for_tilde:
199        tilde_warning_msg = (
200            "NOTE: The current PATH contains path(s) starting with `~`, "
201            "which may not be expanded by all applications."
202        )
203        msg_lines.append(tilde_warning_msg)
204
205    # Returns the formatted multiline message
206    return "\n".join(msg_lines)
207
208
209def sorted_outrows(outrows):
210    # type: (Iterable[InstalledCSVRow]) -> List[InstalledCSVRow]
211    """Return the given rows of a RECORD file in sorted order.
212
213    Each row is a 3-tuple (path, hash, size) and corresponds to a record of
214    a RECORD file (see PEP 376 and PEP 427 for details).  For the rows
215    passed to this function, the size can be an integer as an int or string,
216    or the empty string.
217    """
218    # Normally, there should only be one row per path, in which case the
219    # second and third elements don't come into play when sorting.
220    # However, in cases in the wild where a path might happen to occur twice,
221    # we don't want the sort operation to trigger an error (but still want
222    # determinism).  Since the third element can be an int or string, we
223    # coerce each element to a string to avoid a TypeError in this case.
224    # For additional background, see--
225    # https://github.com/pypa/pip/issues/5868
226    return sorted(outrows, key=lambda row: tuple(str(x) for x in row))
227
228
229def get_csv_rows_for_installed(
230    old_csv_rows,  # type: Iterable[List[str]]
231    installed,  # type: Dict[str, str]
232    changed,  # type: Set[str]
233    generated,  # type: List[str]
234    lib_dir,  # type: str
235):
236    # type: (...) -> List[InstalledCSVRow]
237    """
238    :param installed: A map from archive RECORD path to installation RECORD
239        path.
240    """
241    installed_rows = []  # type: List[InstalledCSVRow]
242    for row in old_csv_rows:
243        if len(row) > 3:
244            logger.warning(
245                'RECORD line has more than three elements: {}'.format(row)
246            )
247        # Make a copy because we are mutating the row.
248        row = list(row)
249        old_path = row[0]
250        new_path = installed.pop(old_path, old_path)
251        row[0] = new_path
252        if new_path in changed:
253            digest, length = rehash(new_path)
254            row[1] = digest
255            row[2] = length
256        installed_rows.append(tuple(row))
257    for f in generated:
258        digest, length = rehash(f)
259        installed_rows.append((normpath(f, lib_dir), digest, str(length)))
260    for f in installed:
261        installed_rows.append((installed[f], '', ''))
262    return installed_rows
263
264
265class MissingCallableSuffix(Exception):
266    pass
267
268
269def _raise_for_invalid_entrypoint(specification):
270    # type: (str) -> None
271    entry = get_export_entry(specification)
272    if entry is not None and entry.suffix is None:
273        raise MissingCallableSuffix(str(entry))
274
275
276class PipScriptMaker(ScriptMaker):
277    def make(self, specification, options=None):
278        # type: (str, Dict[str, Any]) -> List[str]
279        _raise_for_invalid_entrypoint(specification)
280        return super(PipScriptMaker, self).make(specification, options)
281
282
283def install_unpacked_wheel(
284    name,  # type: str
285    wheeldir,  # type: str
286    wheel_zip,  # type: ZipFile
287    scheme,  # type: Scheme
288    req_description,  # type: str
289    pycompile=True,  # type: bool
290    warn_script_location=True  # type: bool
291):
292    # type: (...) -> None
293    """Install a wheel.
294
295    :param name: Name of the project to install
296    :param wheeldir: Base directory of the unpacked wheel
297    :param wheel_zip: open ZipFile for wheel being installed
298    :param scheme: Distutils scheme dictating the install directories
299    :param req_description: String used in place of the requirement, for
300        logging
301    :param pycompile: Whether to byte-compile installed Python files
302    :param warn_script_location: Whether to check that scripts are installed
303        into a directory on PATH
304    :raises UnsupportedWheel:
305        * when the directory holds an unpacked wheel with incompatible
306          Wheel-Version
307        * when the .dist-info dir does not match the wheel
308    """
309    # TODO: Investigate and break this up.
310    # TODO: Look into moving this into a dedicated class for representing an
311    #       installation.
312
313    source = wheeldir.rstrip(os.path.sep) + os.path.sep
314
315    info_dir, metadata = parse_wheel(wheel_zip, name)
316
317    if wheel_root_is_purelib(metadata):
318        lib_dir = scheme.purelib
319    else:
320        lib_dir = scheme.platlib
321
322    subdirs = os.listdir(source)
323    data_dirs = [s for s in subdirs if s.endswith('.data')]
324
325    # Record details of the files moved
326    #   installed = files copied from the wheel to the destination
327    #   changed = files changed while installing (scripts #! line typically)
328    #   generated = files newly generated during the install (script wrappers)
329    installed = {}  # type: Dict[str, str]
330    changed = set()
331    generated = []  # type: List[str]
332
333    # Compile all of the pyc files that we're going to be installing
334    if pycompile:
335        with captured_stdout() as stdout:
336            with warnings.catch_warnings():
337                warnings.filterwarnings('ignore')
338                compileall.compile_dir(source, force=True, quiet=True)
339        logger.debug(stdout.getvalue())
340
341    def record_installed(srcfile, destfile, modified=False):
342        # type: (str, str, bool) -> None
343        """Map archive RECORD paths to installation RECORD paths."""
344        oldpath = normpath(srcfile, wheeldir)
345        newpath = normpath(destfile, lib_dir)
346        installed[oldpath] = newpath
347        if modified:
348            changed.add(destfile)
349
350    def clobber(
351            source,  # type: str
352            dest,  # type: str
353            is_base,  # type: bool
354            fixer=None,  # type: Optional[Callable[[str], Any]]
355            filter=None  # type: Optional[Callable[[str], bool]]
356    ):
357        # type: (...) -> None
358        ensure_dir(dest)  # common for the 'include' path
359
360        for dir, subdirs, files in os.walk(source):
361            basedir = dir[len(source):].lstrip(os.path.sep)
362            destdir = os.path.join(dest, basedir)
363            if is_base and basedir == '':
364                subdirs[:] = [s for s in subdirs if not s.endswith('.data')]
365            for f in files:
366                # Skip unwanted files
367                if filter and filter(f):
368                    continue
369                srcfile = os.path.join(dir, f)
370                destfile = os.path.join(dest, basedir, f)
371                # directory creation is lazy and after the file filtering above
372                # to ensure we don't install empty dirs; empty dirs can't be
373                # uninstalled.
374                ensure_dir(destdir)
375
376                # copyfile (called below) truncates the destination if it
377                # exists and then writes the new contents. This is fine in most
378                # cases, but can cause a segfault if pip has loaded a shared
379                # object (e.g. from pyopenssl through its vendored urllib3)
380                # Since the shared object is mmap'd an attempt to call a
381                # symbol in it will then cause a segfault. Unlinking the file
382                # allows writing of new contents while allowing the process to
383                # continue to use the old copy.
384                if os.path.exists(destfile):
385                    os.unlink(destfile)
386
387                # We use copyfile (not move, copy, or copy2) to be extra sure
388                # that we are not moving directories over (copyfile fails for
389                # directories) as well as to ensure that we are not copying
390                # over any metadata because we want more control over what
391                # metadata we actually copy over.
392                shutil.copyfile(srcfile, destfile)
393
394                # Copy over the metadata for the file, currently this only
395                # includes the atime and mtime.
396                st = os.stat(srcfile)
397                if hasattr(os, "utime"):
398                    os.utime(destfile, (st.st_atime, st.st_mtime))
399
400                # If our file is executable, then make our destination file
401                # executable.
402                if os.access(srcfile, os.X_OK):
403                    st = os.stat(srcfile)
404                    permissions = (
405                        st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
406                    )
407                    os.chmod(destfile, permissions)
408
409                changed = False
410                if fixer:
411                    changed = fixer(destfile)
412                record_installed(srcfile, destfile, changed)
413
414    clobber(source, lib_dir, True)
415
416    dest_info_dir = os.path.join(lib_dir, info_dir)
417
418    # Get the defined entry points
419    ep_file = os.path.join(dest_info_dir, 'entry_points.txt')
420    console, gui = get_entrypoints(ep_file)
421
422    def is_entrypoint_wrapper(name):
423        # type: (str) -> bool
424        # EP, EP.exe and EP-script.py are scripts generated for
425        # entry point EP by setuptools
426        if name.lower().endswith('.exe'):
427            matchname = name[:-4]
428        elif name.lower().endswith('-script.py'):
429            matchname = name[:-10]
430        elif name.lower().endswith(".pya"):
431            matchname = name[:-4]
432        else:
433            matchname = name
434        # Ignore setuptools-generated scripts
435        return (matchname in console or matchname in gui)
436
437    for datadir in data_dirs:
438        fixer = None
439        filter = None
440        for subdir in os.listdir(os.path.join(wheeldir, datadir)):
441            fixer = None
442            if subdir == 'scripts':
443                fixer = fix_script
444                filter = is_entrypoint_wrapper
445            source = os.path.join(wheeldir, datadir, subdir)
446            dest = getattr(scheme, subdir)
447            clobber(source, dest, False, fixer=fixer, filter=filter)
448
449    maker = PipScriptMaker(None, scheme.scripts)
450
451    # Ensure old scripts are overwritten.
452    # See https://github.com/pypa/pip/issues/1800
453    maker.clobber = True
454
455    # Ensure we don't generate any variants for scripts because this is almost
456    # never what somebody wants.
457    # See https://bitbucket.org/pypa/distlib/issue/35/
458    maker.variants = {''}
459
460    # This is required because otherwise distlib creates scripts that are not
461    # executable.
462    # See https://bitbucket.org/pypa/distlib/issue/32/
463    maker.set_mode = True
464
465    scripts_to_generate = []
466
467    # Special case pip and setuptools to generate versioned wrappers
468    #
469    # The issue is that some projects (specifically, pip and setuptools) use
470    # code in setup.py to create "versioned" entry points - pip2.7 on Python
471    # 2.7, pip3.3 on Python 3.3, etc. But these entry points are baked into
472    # the wheel metadata at build time, and so if the wheel is installed with
473    # a *different* version of Python the entry points will be wrong. The
474    # correct fix for this is to enhance the metadata to be able to describe
475    # such versioned entry points, but that won't happen till Metadata 2.0 is
476    # available.
477    # In the meantime, projects using versioned entry points will either have
478    # incorrect versioned entry points, or they will not be able to distribute
479    # "universal" wheels (i.e., they will need a wheel per Python version).
480    #
481    # Because setuptools and pip are bundled with _ensurepip and virtualenv,
482    # we need to use universal wheels. So, as a stopgap until Metadata 2.0, we
483    # override the versioned entry points in the wheel and generate the
484    # correct ones. This code is purely a short-term measure until Metadata 2.0
485    # is available.
486    #
487    # To add the level of hack in this section of code, in order to support
488    # ensurepip this code will look for an ``ENSUREPIP_OPTIONS`` environment
489    # variable which will control which version scripts get installed.
490    #
491    # ENSUREPIP_OPTIONS=altinstall
492    #   - Only pipX.Y and easy_install-X.Y will be generated and installed
493    # ENSUREPIP_OPTIONS=install
494    #   - pipX.Y, pipX, easy_install-X.Y will be generated and installed. Note
495    #     that this option is technically if ENSUREPIP_OPTIONS is set and is
496    #     not altinstall
497    # DEFAULT
498    #   - The default behavior is to install pip, pipX, pipX.Y, easy_install
499    #     and easy_install-X.Y.
500    pip_script = console.pop('pip', None)
501    if pip_script:
502        if "ENSUREPIP_OPTIONS" not in os.environ:
503            scripts_to_generate.append('pip = ' + pip_script)
504
505        if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall":
506            scripts_to_generate.append(
507                'pip%s = %s' % (sys.version_info[0], pip_script)
508            )
509
510        scripts_to_generate.append(
511            'pip%s = %s' % (get_major_minor_version(), pip_script)
512        )
513        # Delete any other versioned pip entry points
514        pip_ep = [k for k in console if re.match(r'pip(\d(\.\d)?)?$', k)]
515        for k in pip_ep:
516            del console[k]
517    easy_install_script = console.pop('easy_install', None)
518    if easy_install_script:
519        if "ENSUREPIP_OPTIONS" not in os.environ:
520            scripts_to_generate.append(
521                'easy_install = ' + easy_install_script
522            )
523
524        scripts_to_generate.append(
525            'easy_install-%s = %s' % (
526                get_major_minor_version(), easy_install_script
527            )
528        )
529        # Delete any other versioned easy_install entry points
530        easy_install_ep = [
531            k for k in console if re.match(r'easy_install(-\d\.\d)?$', k)
532        ]
533        for k in easy_install_ep:
534            del console[k]
535
536    # Generate the console and GUI entry points specified in the wheel
537    scripts_to_generate.extend(
538        '%s = %s' % kv for kv in console.items()
539    )
540
541    gui_scripts_to_generate = [
542        '%s = %s' % kv for kv in gui.items()
543    ]
544
545    generated_console_scripts = []  # type: List[str]
546
547    try:
548        generated_console_scripts = maker.make_multiple(scripts_to_generate)
549        generated.extend(generated_console_scripts)
550
551        generated.extend(
552            maker.make_multiple(gui_scripts_to_generate, {'gui': True})
553        )
554    except MissingCallableSuffix as e:
555        entry = e.args[0]
556        raise InstallationError(
557            "Invalid script entry point: {} for req: {} - A callable "
558            "suffix is required. Cf https://packaging.python.org/"
559            "specifications/entry-points/#use-for-scripts for more "
560            "information.".format(entry, req_description)
561        )
562
563    if warn_script_location:
564        msg = message_about_scripts_not_on_PATH(generated_console_scripts)
565        if msg is not None:
566            logger.warning(msg)
567
568    # Record pip as the installer
569    installer = os.path.join(dest_info_dir, 'INSTALLER')
570    temp_installer = os.path.join(dest_info_dir, 'INSTALLER.pip')
571    with open(temp_installer, 'wb') as installer_file:
572        installer_file.write(b'pip\n')
573    shutil.move(temp_installer, installer)
574    generated.append(installer)
575
576    # Record details of all files installed
577    record = os.path.join(dest_info_dir, 'RECORD')
578    temp_record = os.path.join(dest_info_dir, 'RECORD.pip')
579    with open_for_csv(record, 'r') as record_in:
580        with open_for_csv(temp_record, 'w+') as record_out:
581            reader = csv.reader(record_in)
582            outrows = get_csv_rows_for_installed(
583                reader, installed=installed, changed=changed,
584                generated=generated, lib_dir=lib_dir,
585            )
586            writer = csv.writer(record_out)
587            # Sort to simplify testing.
588            for row in sorted_outrows(outrows):
589                writer.writerow(row)
590    shutil.move(temp_record, record)
591
592
593def install_wheel(
594    name,  # type: str
595    wheel_path,  # type: str
596    scheme,  # type: Scheme
597    req_description,  # type: str
598    pycompile=True,  # type: bool
599    warn_script_location=True,  # type: bool
600    _temp_dir_for_testing=None,  # type: Optional[str]
601):
602    # type: (...) -> None
603    with TempDirectory(
604        path=_temp_dir_for_testing, kind="unpacked-wheel"
605    ) as unpacked_dir, ZipFile(wheel_path, allowZip64=True) as z:
606        unpack_file(wheel_path, unpacked_dir.path)
607        install_unpacked_wheel(
608            name=name,
609            wheeldir=unpacked_dir.path,
610            wheel_zip=z,
611            scheme=scheme,
612            req_description=req_description,
613            pycompile=pycompile,
614            warn_script_location=warn_script_location,
615        )
616