1from __future__ import annotations
2
3import hashlib
4import json
5import os
6import re
7import shutil
8import sys
9from pathlib import Path
10from typing import TYPE_CHECKING, Any, Iterable, Type, cast
11from urllib.parse import urlparse
12
13import atoml
14from pythonfinder import Finder
15from pythonfinder.environment import PYENV_INSTALLED, PYENV_ROOT
16
17from pdm import termui
18from pdm._types import Source
19from pdm.exceptions import NoPythonVersion, PdmUsageError, ProjectError
20from pdm.models import pip_shims
21from pdm.models.caches import CandidateInfoCache, HashCache
22from pdm.models.candidates import Candidate
23from pdm.models.environment import Environment, GlobalEnvironment
24from pdm.models.python import PythonInfo
25from pdm.models.repositories import BaseRepository, LockedRepository, PyPIRepository
26from pdm.models.requirements import Requirement, parse_requirement
27from pdm.models.specifiers import PySpecSet, get_specifier
28from pdm.project.config import Config
29from pdm.project.metadata import MutableMetadata as Metadata
30from pdm.utils import (
31    atomic_open_for_write,
32    cached_property,
33    cd,
34    expand_env_vars_in_auth,
35    find_project_root,
36    find_python_in_path,
37    get_in_project_venv_python,
38    get_venv_like_prefix,
39)
40
41if TYPE_CHECKING:
42    from resolvelib.reporters import BaseReporter
43
44    from pdm._vendor import halo
45    from pdm.core import Core
46    from pdm.resolver.providers import BaseProvider
47
48
49class Project:
50    """Core project class"""
51
52    PYPROJECT_FILENAME = "pyproject.toml"
53    DEPENDENCIES_RE = re.compile(r"(?:(.+?)-)?dependencies")
54    LOCKFILE_VERSION = "3.1"
55    GLOBAL_PROJECT = Path.home() / ".pdm" / "global-project"
56
57    def __init__(
58        self, core: Core, root_path: str | Path | None, is_global: bool = False
59    ) -> None:
60        self._pyproject: dict | None = None
61        self._lockfile: dict | None = None
62        self._environment: Environment | None = None
63        self._python: PythonInfo | None = None
64        self.core = core
65
66        if root_path is None:
67            root_path = (
68                find_project_root(max_depth=self.global_config["project_max_depth"])
69                if not is_global
70                else self.GLOBAL_PROJECT
71            )
72        if not is_global and root_path is None and self.global_config["auto_global"]:
73            self.core.ui.echo(
74                "Project is not found, fallback to the global project",
75                fg="yellow",
76                err=True,
77            )
78            root_path = self.GLOBAL_PROJECT
79            is_global = True
80
81        self.root = Path(root_path or "").absolute()
82        self.is_global = is_global
83        self.init_global_project()
84
85    def __repr__(self) -> str:
86        return f"<Project '{self.root.as_posix()}'>"
87
88    @property
89    def pyproject_file(self) -> Path:
90        return self.root / self.PYPROJECT_FILENAME
91
92    @property
93    def lockfile_file(self) -> Path:
94        return self.root / "pdm.lock"
95
96    @property
97    def pyproject(self) -> dict | None:
98        if not self._pyproject and self.pyproject_file.exists():
99            data = atoml.parse(self.pyproject_file.read_text("utf-8"))
100            self._pyproject = cast(dict, data)
101        return self._pyproject
102
103    @pyproject.setter
104    def pyproject(self, data: dict[str, Any]) -> None:
105        self._pyproject = data
106
107    @property
108    def tool_settings(self) -> dict:
109        data = self.pyproject
110        if not data:
111            return {}
112        return data.setdefault("tool", {}).setdefault("pdm", {})
113
114    @property
115    def lockfile(self) -> dict:
116        if not self._lockfile:
117            if not self.lockfile_file.is_file():
118                raise ProjectError("Lock file does not exist.")
119            data = atoml.parse(self.lockfile_file.read_text("utf-8"))
120            self._lockfile = cast(dict, data)
121        return self._lockfile
122
123    @lockfile.setter
124    def lockfile(self, data: dict[str, Any]) -> None:
125        self._lockfile = data
126
127    @property
128    def config(self) -> dict[str, Any]:
129        """A read-only dict configuration, any modifications won't land in the file."""
130        result = dict(self.global_config)
131        result.update(self.project_config)
132        return result
133
134    @property
135    def scripts(self) -> dict[str, str | dict[str, str]]:
136        return self.tool_settings.get("scripts")  # type: ignore
137
138    @cached_property
139    def global_config(self) -> Config:
140        """Read-and-writable configuration dict for global settings"""
141        return Config(self.GLOBAL_PROJECT.with_name("config.toml"), is_global=True)
142
143    @cached_property
144    def project_config(self) -> Config:
145        """Read-and-writable configuration dict for project settings"""
146        return Config(self.root / ".pdm.toml")
147
148    @property
149    def python(self) -> PythonInfo:
150        if not self._python:
151            self._python = self.resolve_interpreter()
152        return self._python
153
154    @python.setter
155    def python(self, value: PythonInfo) -> None:
156        self._python = value
157        self.project_config["python.path"] = value.path
158
159    @property
160    def python_executable(self) -> str:
161        """For backward compatibility"""
162        return self.python.executable
163
164    def resolve_interpreter(self) -> PythonInfo:
165        """Get the Python interpreter path."""
166        config = self.config
167        if self.project_config.get("python.path") and not os.getenv(
168            "PDM_IGNORE_SAVED_PYTHON"
169        ):
170            saved_path = self.project_config["python.path"]
171            try:
172                return PythonInfo.from_path(saved_path)
173            except (ValueError, FileNotFoundError):
174                del self.project_config["python.path"]
175        if os.name == "nt":
176            suffix = ".exe"
177            scripts = "Scripts"
178        else:
179            suffix = ""
180            scripts = "bin"
181
182        # Resolve virtual environments from env-vars
183        virtual_env = os.getenv("VIRTUAL_ENV", os.getenv("CONDA_PREFIX"))
184        if config["use_venv"] and virtual_env:
185            return PythonInfo.from_path(
186                os.path.join(virtual_env, scripts, f"python{suffix}")
187            )
188
189        for py_version in self.find_interpreters():
190            if self.python_requires.contains(str(py_version.version)):
191                self.python = py_version
192                return py_version
193
194        raise NoPythonVersion(
195            "No Python that satisfies {} is found on the system.".format(
196                self.python_requires
197            )
198        )
199
200    def get_environment(self) -> Environment:
201        """Get the environment selected by this project"""
202        if self.is_global:
203            env = GlobalEnvironment(self)
204            # Rewrite global project's python requires to be
205            # compatible with the exact version
206            env.python_requires = PySpecSet(f"=={self.python.version}")
207            return env
208        if self.config["use_venv"] and get_venv_like_prefix(self.python.executable):
209            # Only recognize venv created by python -m venv and virtualenv>20
210            return GlobalEnvironment(self)
211        return Environment(self)
212
213    @property
214    def environment(self) -> Environment:
215        if not self._environment:
216            self._environment = self.get_environment()
217        return self._environment
218
219    @environment.setter
220    def environment(self, value: Environment) -> None:
221        self._environment = value
222
223    @property
224    def python_requires(self) -> PySpecSet:
225        return PySpecSet(self.meta.requires_python)
226
227    def get_dependencies(self, group: str | None = None) -> dict[str, Requirement]:
228        metadata = self.meta
229        optional_dependencies = metadata.get("optional-dependencies", {})
230        dev_dependencies = self.tool_settings.get("dev-dependencies", {})
231        if group in (None, "default"):
232            deps = metadata.get("dependencies", [])
233        else:
234            if group in optional_dependencies and group in dev_dependencies:
235                self.core.ui.echo(
236                    f"The {group} group exists in both [optional-dependencies] "
237                    "and [dev-dependencies], the former is taken.",
238                    err=True,
239                    fg="yellow",
240                )
241            if group in optional_dependencies:
242                deps = optional_dependencies[group]
243            elif group in dev_dependencies:
244                deps = dev_dependencies[group]
245            else:
246                raise PdmUsageError(f"Non-exist group {group}")
247        result = {}
248        with cd(self.root):
249            for line in deps:
250                if line.startswith("-e "):
251                    req = parse_requirement(line[3:].strip(), True)
252                else:
253                    req = parse_requirement(line)
254                # make editable packages behind normal ones to override correctly.
255                result[req.identify()] = req
256        return result
257
258    @property
259    def dependencies(self) -> dict[str, Requirement]:
260        return self.get_dependencies()
261
262    @property
263    def dev_dependencies(self) -> dict[str, Requirement]:
264        """All development dependencies"""
265        dev_group = self.tool_settings.get("dev-dependencies", {})
266        if not dev_group:
267            return {}
268        result = {}
269        with cd(self.root):
270            for _, deps in dev_group.items():
271                for line in deps:
272                    if line.startswith("-e "):
273                        req = parse_requirement(line[3:].strip(), True)
274                    else:
275                        req = parse_requirement(line)
276                    result[req.identify()] = req
277        return result
278
279    def iter_groups(self) -> Iterable[str]:
280        groups = {"default"}
281        if self.meta.optional_dependencies:
282            groups.update(self.meta.optional_dependencies.keys())
283        if self.tool_settings.get("dev-dependencies"):
284            groups.update(self.tool_settings["dev-dependencies"].keys())
285        return groups
286
287    @property
288    def all_dependencies(self) -> dict[str, dict[str, Requirement]]:
289        return {group: self.get_dependencies(group) for group in self.iter_groups()}
290
291    @property
292    def allow_prereleases(self) -> bool | None:
293        return self.tool_settings.get("allow_prereleases")
294
295    @property
296    def sources(self) -> list[Source]:
297        sources = list(self.tool_settings.get("source", []))
298        if all(source.get("name") != "pypi" for source in sources):
299            sources.insert(
300                0,
301                {
302                    "url": self.config["pypi.url"],
303                    "verify_ssl": self.config["pypi.verify_ssl"],
304                    "name": "pypi",
305                },
306            )
307        expanded_sources: list[Source] = [
308            Source(
309                url=expand_env_vars_in_auth(s["url"]),
310                verify_ssl=s.get("verify_ssl", True),
311                name=s.get("name", urlparse(s["url"]).hostname),
312                type=s.get("type", "index"),
313            )
314            for s in sources
315        ]
316        return expanded_sources
317
318    def get_repository(self, cls: Type[BaseRepository] | None = None) -> BaseRepository:
319        """Get the repository object"""
320        if cls is None:
321            cls = PyPIRepository
322        sources = self.sources or []
323        return cls(sources, self.environment)
324
325    @property
326    def locked_repository(self) -> LockedRepository:
327        import copy
328
329        try:
330            lockfile = copy.deepcopy(self.lockfile)
331        except ProjectError:
332            lockfile = {}
333
334        return LockedRepository(lockfile, self.sources, self.environment)
335
336    def get_provider(
337        self,
338        strategy: str = "all",
339        tracked_names: Iterable[str] | None = None,
340        for_install: bool = False,
341    ) -> BaseProvider:
342        """Build a provider class for resolver.
343
344        :param strategy: the resolve strategy
345        :param tracked_names: the names of packages that needs to update
346        :param for_install: if the provider is for install
347        :returns: The provider object
348        """
349
350        from pdm.resolver.providers import (
351            BaseProvider,
352            EagerUpdateProvider,
353            ReusePinProvider,
354        )
355
356        repository = self.get_repository(cls=self.core.repository_class)
357        allow_prereleases = self.allow_prereleases
358        if strategy != "all" and not self.is_lockfile_compatible():
359            self.core.ui.echo(
360                "Updating the whole lock file as it is not compatible with PDM",
361                fg="yellow",
362                err=True,
363            )
364            strategy = "all"
365        if not for_install and strategy == "all":
366            return BaseProvider(repository, allow_prereleases)
367
368        locked_repository = self.locked_repository
369        if for_install:
370            return BaseProvider(locked_repository, allow_prereleases)
371        provider_class = (
372            ReusePinProvider if strategy == "reuse" else EagerUpdateProvider
373        )
374        return provider_class(
375            locked_repository.all_candidates,
376            tracked_names or (),
377            repository,
378            allow_prereleases,
379        )
380
381    def get_reporter(
382        self,
383        requirements: list[Requirement],
384        tracked_names: Iterable[str] | None = None,
385        spinner: halo.Halo | termui.DummySpinner | None = None,
386    ) -> BaseReporter:
387        """Return the reporter object to construct a resolver.
388
389        :param requirements: requirements to resolve
390        :param tracked_names: the names of packages that needs to update
391        :param spinner: optional spinner object
392        :returns: a reporter
393        """
394        from pdm.resolver.reporters import SpinnerReporter
395
396        return SpinnerReporter(spinner or termui.DummySpinner(), requirements)
397
398    def get_lock_metadata(self) -> dict[str, Any]:
399        content_hash = atoml.string("sha256:" + self.get_content_hash("sha256"))
400        content_hash.trivia.trail = "\n\n"
401        return {"lock_version": self.LOCKFILE_VERSION, "content_hash": content_hash}
402
403    def write_lockfile(
404        self, toml_data: dict, show_message: bool = True, write: bool = True
405    ) -> None:
406        toml_data["metadata"].update(self.get_lock_metadata())
407
408        if write:
409            with atomic_open_for_write(self.lockfile_file) as fp:
410                atoml.dump(toml_data, fp)  # type: ignore
411            if show_message:
412                self.core.ui.echo(f"Changes are written to {termui.green('pdm.lock')}.")
413            self._lockfile = None
414        else:
415            self._lockfile = toml_data
416
417    def make_self_candidate(self, editable: bool = True) -> Candidate:
418        req = parse_requirement(pip_shims.path_to_url(self.root.as_posix()), editable)
419        req.name = self.meta.name
420        return Candidate(
421            req, self.environment, name=self.meta.name, version=self.meta.version
422        )
423
424    def get_content_hash(self, algo: str = "md5") -> str:
425        # Only calculate sources and dependencies groups. Otherwise lock file is
426        # considered as unchanged.
427        dump_data = {
428            "sources": self.tool_settings.get("source", []),
429            "dependencies": self.meta.get("dependencies", []),
430            "dev-dependencies": self.tool_settings.get("dev-dependencies", {}),
431            "optional-dependencies": self.meta.get("optional-dependencies", {}),
432            "requires-python": self.meta.get("requires-python", ""),
433        }
434        pyproject_content = json.dumps(dump_data, sort_keys=True)
435        hasher = hashlib.new(algo)
436        hasher.update(pyproject_content.encode("utf-8"))
437        return hasher.hexdigest()
438
439    def is_lockfile_hash_match(self) -> bool:
440        if not self.lockfile_file.exists():
441            return False
442        hash_in_lockfile = str(
443            self.lockfile.get("metadata", {}).get("content_hash", "")
444        )
445        if not hash_in_lockfile:
446            return False
447        algo, hash_value = hash_in_lockfile.split(":")
448        content_hash = self.get_content_hash(algo)
449        return content_hash == hash_value
450
451    def is_lockfile_compatible(self) -> bool:
452        if not self.lockfile_file.exists():
453            return True
454        lockfile_version = str(
455            self.lockfile.get("metadata", {}).get("lock_version", "")
456        )
457        if not lockfile_version:
458            return False
459        if "." not in lockfile_version:
460            lockfile_version += ".0"
461        accepted = get_specifier(f"~={lockfile_version}")
462        return accepted.contains(self.LOCKFILE_VERSION)
463
464    def get_pyproject_dependencies(self, group: str, dev: bool = False) -> list[str]:
465        """Get the dependencies array in the pyproject.toml"""
466        if group == "default":
467            return self.meta.setdefault("dependencies", [])
468        else:
469            deps_dict = {
470                False: self.meta.setdefault("optional-dependencies", {}),
471                True: self.tool_settings.setdefault("dev-dependencies", {}),
472            }
473            for deps in deps_dict.values():
474                if group in deps:
475                    return deps[group]
476            return deps_dict[dev].setdefault(group, [])
477
478    def add_dependencies(
479        self,
480        requirements: dict[str, Requirement],
481        to_group: str = "default",
482        dev: bool = False,
483        show_message: bool = True,
484    ) -> None:
485        deps = self.get_pyproject_dependencies(to_group, dev).multiline(  # type: ignore
486            True
487        )
488        for _, dep in requirements.items():
489            matched_index = next(
490                (i for i, r in enumerate(deps) if dep.matches(r)), None
491            )
492            if matched_index is None:
493                deps.append(dep.as_line())
494            else:
495                req = dep.as_line()
496                deps[matched_index] = req
497        self.write_pyproject(show_message)
498
499    def write_pyproject(self, show_message: bool = True) -> None:
500        with atomic_open_for_write(
501            self.pyproject_file.as_posix(), encoding="utf-8"
502        ) as f:
503            atoml.dump(self.pyproject, f)  # type: ignore
504        if show_message:
505            self.core.ui.echo(
506                f"Changes are written to {termui.green('pyproject.toml')}."
507            )
508        self._pyproject = None
509
510    @property
511    def meta(self) -> Metadata:
512        if not self.pyproject:
513            self.pyproject = {"project": atoml.table()}
514        project_meta = self.pyproject["project"]
515        meta_version = project_meta.get("version")
516        updated = False
517        # Move version to tool table
518        if isinstance(meta_version, dict):
519            if "version" in self.tool_settings:
520                self.core.ui.echo(
521                    "WARNING: Removing dynamic `version` from [project] table",
522                    fg="yellow",
523                    err=True,
524                )
525            else:
526                self.core.ui.echo(
527                    "WARNING: Moving dynamic `version` from [project] to [tool.pdm]",
528                    fg="yellow",
529                    err=True,
530                )
531                self.tool_settings["version"] = meta_version
532            del project_meta["version"]
533            updated = True
534        # Delete classifiers from dynamic
535        dynamic_fields = project_meta.get("dynamic", [])
536        if "classifiers" in dynamic_fields:
537            self.core.ui.echo(
538                "WARNING: Dynamic `classifiers` is no longer supported, "
539                "please supply all classifiers manually",
540                fg="yellow",
541                err=True,
542            )
543            dynamic_fields.remove("classifiers")
544            if not dynamic_fields:
545                del project_meta["dynamic"]
546            updated = True
547        if updated:
548            self.write_pyproject(False)
549        m = Metadata(self.pyproject_file, False)
550        m._metadata = self.pyproject.get("project", {})
551        m._tool_settings = self.tool_settings
552        return m
553
554    def init_global_project(self) -> None:
555        if not self.is_global:
556            return
557        if not self.pyproject_file.exists():
558            self.root.mkdir(parents=True, exist_ok=True)
559            self.pyproject_file.write_text(
560                """\
561[project]
562dependencies = ["pip", "setuptools", "wheel"]
563"""
564            )
565            self._pyproject = None
566
567    @property
568    def cache_dir(self) -> Path:
569        return Path(self.config.get("cache_dir", ""))
570
571    def cache(self, name: str) -> Path:
572        path = self.cache_dir / name
573        path.mkdir(parents=True, exist_ok=True)
574        return path
575
576    def make_wheel_cache(self) -> pip_shims.WheelCache:
577        return pip_shims.WheelCache(
578            self.cache_dir.as_posix(), pip_shims.FormatControl(set(), set())
579        )
580
581    def make_candidate_info_cache(self) -> CandidateInfoCache:
582
583        python_hash = hashlib.sha1(
584            str(self.environment.python_requires).encode()
585        ).hexdigest()
586        file_name = f"package_meta_{python_hash}.json"
587        return CandidateInfoCache(self.cache("metadata") / file_name)
588
589    def make_hash_cache(self) -> HashCache:
590        return HashCache(directory=self.cache("hashes").as_posix())
591
592    def find_interpreters(self, python_spec: str | None = None) -> Iterable[PythonInfo]:
593        """Return an iterable of interpreter paths that matches the given specifier,
594        which can be:
595            1. a version specifier like 3.7
596            2. an absolute path
597            3. a short name like python3
598            4. None that returns all possible interpreters
599        """
600        config = self.config
601        python: str | Path | None = None
602
603        if not python_spec:
604            if config.get("python.use_pyenv", True) and PYENV_INSTALLED:
605                pyenv_shim = os.path.join(PYENV_ROOT, "shims", "python3")
606                if os.name == "nt":
607                    pyenv_shim += ".bat"
608                if os.path.exists(pyenv_shim):
609                    yield PythonInfo.from_path(pyenv_shim)
610                elif os.path.exists(pyenv_shim.replace("python3", "python")):
611                    yield PythonInfo.from_path(pyenv_shim.replace("python3", "python"))
612            if config.get("use_venv"):
613                python = get_in_project_venv_python(self.root)
614                if python:
615                    yield PythonInfo.from_path(python)
616            python = shutil.which("python")
617            if python:
618                yield PythonInfo.from_path(python)
619            args = []
620        else:
621            if not all(c.isdigit() for c in python_spec.split(".")):
622                if Path(python_spec).exists():
623                    python = find_python_in_path(python_spec)
624                    if python:
625                        yield PythonInfo.from_path(python)
626                else:
627                    python = shutil.which(python_spec)
628                    if python:
629                        yield PythonInfo.from_path(python)
630                return
631            args = [int(v) for v in python_spec.split(".") if v != ""]
632        finder = Finder()
633        for entry in finder.find_all_python_versions(*args):
634            yield PythonInfo.from_python_version(entry.py_version)
635        if not python_spec:
636            this_python = getattr(sys, "_base_executable", sys.executable)
637            yield PythonInfo.from_path(this_python)
638