1from __future__ import annotations
2
3import os
4import re
5import shlex
6import shutil
7import subprocess
8import sys
9import tempfile
10from contextlib import contextmanager
11from pathlib import Path
12from typing import TYPE_CHECKING, Generator
13
14from pdm import termui
15from pdm.exceptions import BuildError
16from pdm.models import pip_shims
17from pdm.models.auth import make_basic_auth
18from pdm.models.in_process import (
19    get_pep508_environment,
20    get_python_abi_tag,
21    get_sys_config_paths,
22)
23from pdm.models.working_set import WorkingSet
24from pdm.utils import cached_property, get_finder, is_venv_python, pdm_scheme
25
26if TYPE_CHECKING:
27    from pdm._types import Source
28    from pdm.models.python import PythonInfo
29    from pdm.project import Project
30
31
32def _get_shebang_path(executable: str, is_launcher: bool) -> bytes:
33    """Get the interpreter path in the shebang line
34
35    The launcher can just use the command as-is.
36    Otherwise if the path contains whitespace or is too long, both distlib
37    and installer use a clever hack to make the shebang after ``/bin/sh``,
38    where the interpreter path is quoted.
39    """
40    if is_launcher or " " not in executable and (len(executable) + 3) <= 127:
41        return executable.encode("utf-8")
42    return shlex.quote(executable).encode("utf-8")
43
44
45def _replace_shebang(contents: bytes, new_executable: bytes) -> bytes:
46    """Replace the python executable from the shebeng line, which can be in two forms:
47
48    1. #!python_executable
49    2. #!/bin/sh
50       '''exec' '/path to/python' "$0" "$@"
51       ' '''
52    """
53    _complex_shebang_re = rb"^'''exec' ('.+?') \"\$0\""
54    _simple_shebang_re = rb"^#!(.+?)\s*$"
55    match = re.search(_complex_shebang_re, contents, flags=re.M)
56    if match:
57        return contents.replace(match.group(1), new_executable, 1)
58    else:
59        match = re.search(_simple_shebang_re, contents, flags=re.M)
60        assert match is not None
61        return contents.replace(match.group(1), new_executable, 1)
62
63
64class Environment:
65    """Environment dependent stuff related to the selected Python interpreter."""
66
67    is_global = False
68
69    def __init__(self, project: Project) -> None:
70        """
71        :param project: the project instance
72        """
73        self.python_requires = project.python_requires
74        self.project = project
75        self.interpreter: PythonInfo = project.python
76        self._essential_installed = False
77        self.auth = make_basic_auth(
78            self.project.sources, self.project.core.ui.verbosity >= termui.DETAIL
79        )
80
81    def get_paths(self) -> dict[str, str]:
82        """Get paths like ``sysconfig.get_paths()`` for installation."""
83        return pdm_scheme(str(self.packages_path))
84
85    @cached_property
86    def packages_path(self) -> Path:
87        """The local packages path."""
88        pypackages = (
89            self.project.root  # type: ignore
90            / "__pypackages__"
91            / self.interpreter.identifier
92        )
93        if not pypackages.exists() and "-32" in pypackages.name:
94            compatible_packages = pypackages.with_name(pypackages.name[:-3])
95            if compatible_packages.exists():
96                pypackages = compatible_packages
97        scripts = "Scripts" if os.name == "nt" else "bin"
98        if not pypackages.parent.exists():
99            pypackages.parent.mkdir(parents=True)
100            pypackages.parent.joinpath(".gitignore").write_text("*\n!.gitignore\n")
101        for subdir in [scripts, "include", "lib"]:
102            pypackages.joinpath(subdir).mkdir(exist_ok=True, parents=True)
103        return pypackages
104
105    @contextmanager
106    def get_finder(
107        self,
108        sources: list[Source] | None = None,
109        ignore_requires_python: bool = False,
110    ) -> Generator[pip_shims.PackageFinder, None, None]:
111        """Return the package finder of given index sources.
112
113        :param sources: a list of sources the finder should search in.
114        :param ignore_requires_python: whether to ignore the python version constraint.
115        """
116        if sources is None:
117            sources = self.project.sources
118
119        python_version = self.interpreter.version_tuple
120        python_abi_tag = get_python_abi_tag(self.interpreter.executable)
121        finder = get_finder(
122            sources,
123            self.project.cache_dir.as_posix(),
124            python_version,
125            python_abi_tag,
126            ignore_requires_python,
127        )
128        # Reuse the auth across sessions to avoid prompting repeatedly.
129        finder.session.auth = self.auth  # type: ignore
130        yield finder
131        finder.session.close()  # type: ignore
132
133    def get_working_set(self) -> WorkingSet:
134        """Get the working set based on local packages directory."""
135        paths = self.get_paths()
136        return WorkingSet([paths["platlib"], paths["purelib"]])
137
138    @cached_property
139    def marker_environment(self) -> dict[str, str]:
140        """Get environment for marker evaluation"""
141        return get_pep508_environment(self.interpreter.executable)
142
143    def which(self, command: str) -> str | None:
144        """Get the full path of the given executable against this environment."""
145        if not os.path.isabs(command) and command.startswith("python"):
146            python = os.path.splitext(command)[0]
147            version = python[6:]
148            this_version = self.interpreter.version
149            if not version or str(this_version).startswith(version):
150                return self.interpreter.executable
151        # Fallback to use shutil.which to find the executable
152        this_path = self.get_paths()["scripts"]
153        python_root = os.path.dirname(self.interpreter.executable)
154        new_path = os.pathsep.join([this_path, os.getenv("PATH", ""), python_root])
155        return shutil.which(command, path=new_path)
156
157    def update_shebangs(self, new_path: str) -> None:
158        """Update the shebang lines"""
159        scripts = self.get_paths()["scripts"]
160        for child in Path(scripts).iterdir():
161            if not child.is_file() or child.suffix not in (".exe", ".py", ""):
162                continue
163            is_launcher = child.suffix == ".exe"
164            new_shebang = _get_shebang_path(new_path, is_launcher)
165            child.write_bytes(_replace_shebang(child.read_bytes(), new_shebang))
166
167    def _download_pip_wheel(self, path: str | Path) -> None:
168        dirname = Path(tempfile.mkdtemp(prefix="pip-download-"))
169        try:
170            subprocess.check_call(
171                [
172                    getattr(sys, "_original_executable", sys.executable),
173                    "-m",
174                    "pip",
175                    "download",
176                    "--only-binary=:all:",
177                    "-d",
178                    str(dirname),
179                    "pip<21",  # pip>=21 drops the support of py27
180                ],
181            )
182            wheel_file = next(dirname.glob("pip-*.whl"))
183            shutil.move(str(wheel_file), path)
184        except subprocess.CalledProcessError:
185            raise BuildError("Failed to download pip for the given interpreter")
186        finally:
187            shutil.rmtree(dirname, ignore_errors=True)
188
189    @cached_property
190    def pip_command(self) -> list[str]:
191        """Get a pip command for this environment, and download one if not available.
192        Return a list of args like ['python', '-m', 'pip']
193        """
194        from pip import __file__ as pip_location
195
196        python_major = self.interpreter.major
197        executable = self.interpreter.executable
198        proc = subprocess.run(
199            [executable, "-Esm", "pip", "--version"], capture_output=True
200        )
201        if proc.returncode == 0:
202            # The pip has already been installed with the executable, just use it
203            return [executable, "-Esm", "pip"]
204        if python_major == 3:
205            # Use the host pip package.
206            return [executable, "-Es", os.path.dirname(pip_location)]
207        # For py2, only pip<21 is eligible, download a pip wheel from the Internet.
208        pip_wheel = self.project.cache_dir / "pip.whl"
209        if not pip_wheel.is_file():
210            self._download_pip_wheel(pip_wheel)
211        return [executable, str(pip_wheel / "pip")]
212
213
214class GlobalEnvironment(Environment):
215    """Global environment"""
216
217    is_global = True
218
219    def get_paths(self) -> dict[str, str]:
220        paths = get_sys_config_paths(self.interpreter.executable)
221        if is_venv_python(self.interpreter.executable):
222            python_xy = f"python{self.interpreter.identifier}"
223            paths["include"] = os.path.join(paths["data"], "include", "site", python_xy)
224        paths["prefix"] = paths["data"]
225        paths["headers"] = paths["include"]
226        return paths
227
228    @property
229    def packages_path(self) -> Path | None:  # type: ignore
230        return None
231