1from __future__ import annotations 2 3import hashlib 4import json 5import os 6import re 7import shutil 8import sys 9from pathlib import Path 10from typing import TYPE_CHECKING, Any, Iterable, Type, cast 11from urllib.parse import urlparse 12 13import atoml 14from pythonfinder import Finder 15from pythonfinder.environment import PYENV_INSTALLED, PYENV_ROOT 16 17from pdm import termui 18from pdm._types import Source 19from pdm.exceptions import NoPythonVersion, PdmUsageError, ProjectError 20from pdm.models import pip_shims 21from pdm.models.caches import CandidateInfoCache, HashCache 22from pdm.models.candidates import Candidate 23from pdm.models.environment import Environment, GlobalEnvironment 24from pdm.models.python import PythonInfo 25from pdm.models.repositories import BaseRepository, LockedRepository, PyPIRepository 26from pdm.models.requirements import Requirement, parse_requirement 27from pdm.models.specifiers import PySpecSet, get_specifier 28from pdm.project.config import Config 29from pdm.project.metadata import MutableMetadata as Metadata 30from pdm.utils import ( 31 atomic_open_for_write, 32 cached_property, 33 cd, 34 expand_env_vars_in_auth, 35 find_project_root, 36 find_python_in_path, 37 get_in_project_venv_python, 38 get_venv_like_prefix, 39) 40 41if TYPE_CHECKING: 42 from resolvelib.reporters import BaseReporter 43 44 from pdm._vendor import halo 45 from pdm.core import Core 46 from pdm.resolver.providers import BaseProvider 47 48 49class Project: 50 """Core project class""" 51 52 PYPROJECT_FILENAME = "pyproject.toml" 53 DEPENDENCIES_RE = re.compile(r"(?:(.+?)-)?dependencies") 54 LOCKFILE_VERSION = "3.1" 55 GLOBAL_PROJECT = Path.home() / ".pdm" / "global-project" 56 57 def __init__( 58 self, core: Core, root_path: str | Path | None, is_global: bool = False 59 ) -> None: 60 self._pyproject: dict | None = None 61 self._lockfile: dict | None = None 62 self._environment: Environment | None = None 63 self._python: PythonInfo | None = None 64 self.core = core 65 66 if root_path is None: 67 root_path = ( 68 find_project_root(max_depth=self.global_config["project_max_depth"]) 69 if not is_global 70 else self.GLOBAL_PROJECT 71 ) 72 if not is_global and root_path is None and self.global_config["auto_global"]: 73 self.core.ui.echo( 74 "Project is not found, fallback to the global project", 75 fg="yellow", 76 err=True, 77 ) 78 root_path = self.GLOBAL_PROJECT 79 is_global = True 80 81 self.root = Path(root_path or "").absolute() 82 self.is_global = is_global 83 self.init_global_project() 84 85 def __repr__(self) -> str: 86 return f"<Project '{self.root.as_posix()}'>" 87 88 @property 89 def pyproject_file(self) -> Path: 90 return self.root / self.PYPROJECT_FILENAME 91 92 @property 93 def lockfile_file(self) -> Path: 94 return self.root / "pdm.lock" 95 96 @property 97 def pyproject(self) -> dict | None: 98 if not self._pyproject and self.pyproject_file.exists(): 99 data = atoml.parse(self.pyproject_file.read_text("utf-8")) 100 self._pyproject = cast(dict, data) 101 return self._pyproject 102 103 @pyproject.setter 104 def pyproject(self, data: dict[str, Any]) -> None: 105 self._pyproject = data 106 107 @property 108 def tool_settings(self) -> dict: 109 data = self.pyproject 110 if not data: 111 return {} 112 return data.setdefault("tool", {}).setdefault("pdm", {}) 113 114 @property 115 def lockfile(self) -> dict: 116 if not self._lockfile: 117 if not self.lockfile_file.is_file(): 118 raise ProjectError("Lock file does not exist.") 119 data = atoml.parse(self.lockfile_file.read_text("utf-8")) 120 self._lockfile = cast(dict, data) 121 return self._lockfile 122 123 @lockfile.setter 124 def lockfile(self, data: dict[str, Any]) -> None: 125 self._lockfile = data 126 127 @property 128 def config(self) -> dict[str, Any]: 129 """A read-only dict configuration, any modifications won't land in the file.""" 130 result = dict(self.global_config) 131 result.update(self.project_config) 132 return result 133 134 @property 135 def scripts(self) -> dict[str, str | dict[str, str]]: 136 return self.tool_settings.get("scripts") # type: ignore 137 138 @cached_property 139 def global_config(self) -> Config: 140 """Read-and-writable configuration dict for global settings""" 141 return Config(self.GLOBAL_PROJECT.with_name("config.toml"), is_global=True) 142 143 @cached_property 144 def project_config(self) -> Config: 145 """Read-and-writable configuration dict for project settings""" 146 return Config(self.root / ".pdm.toml") 147 148 @property 149 def python(self) -> PythonInfo: 150 if not self._python: 151 self._python = self.resolve_interpreter() 152 return self._python 153 154 @python.setter 155 def python(self, value: PythonInfo) -> None: 156 self._python = value 157 self.project_config["python.path"] = value.path 158 159 @property 160 def python_executable(self) -> str: 161 """For backward compatibility""" 162 return self.python.executable 163 164 def resolve_interpreter(self) -> PythonInfo: 165 """Get the Python interpreter path.""" 166 config = self.config 167 if self.project_config.get("python.path") and not os.getenv( 168 "PDM_IGNORE_SAVED_PYTHON" 169 ): 170 saved_path = self.project_config["python.path"] 171 try: 172 return PythonInfo.from_path(saved_path) 173 except (ValueError, FileNotFoundError): 174 del self.project_config["python.path"] 175 if os.name == "nt": 176 suffix = ".exe" 177 scripts = "Scripts" 178 else: 179 suffix = "" 180 scripts = "bin" 181 182 # Resolve virtual environments from env-vars 183 virtual_env = os.getenv("VIRTUAL_ENV", os.getenv("CONDA_PREFIX")) 184 if config["use_venv"] and virtual_env: 185 return PythonInfo.from_path( 186 os.path.join(virtual_env, scripts, f"python{suffix}") 187 ) 188 189 for py_version in self.find_interpreters(): 190 if self.python_requires.contains(str(py_version.version)): 191 self.python = py_version 192 return py_version 193 194 raise NoPythonVersion( 195 "No Python that satisfies {} is found on the system.".format( 196 self.python_requires 197 ) 198 ) 199 200 def get_environment(self) -> Environment: 201 """Get the environment selected by this project""" 202 if self.is_global: 203 env = GlobalEnvironment(self) 204 # Rewrite global project's python requires to be 205 # compatible with the exact version 206 env.python_requires = PySpecSet(f"=={self.python.version}") 207 return env 208 if self.config["use_venv"] and get_venv_like_prefix(self.python.executable): 209 # Only recognize venv created by python -m venv and virtualenv>20 210 return GlobalEnvironment(self) 211 return Environment(self) 212 213 @property 214 def environment(self) -> Environment: 215 if not self._environment: 216 self._environment = self.get_environment() 217 return self._environment 218 219 @environment.setter 220 def environment(self, value: Environment) -> None: 221 self._environment = value 222 223 @property 224 def python_requires(self) -> PySpecSet: 225 return PySpecSet(self.meta.requires_python) 226 227 def get_dependencies(self, group: str | None = None) -> dict[str, Requirement]: 228 metadata = self.meta 229 optional_dependencies = metadata.get("optional-dependencies", {}) 230 dev_dependencies = self.tool_settings.get("dev-dependencies", {}) 231 if group in (None, "default"): 232 deps = metadata.get("dependencies", []) 233 else: 234 if group in optional_dependencies and group in dev_dependencies: 235 self.core.ui.echo( 236 f"The {group} group exists in both [optional-dependencies] " 237 "and [dev-dependencies], the former is taken.", 238 err=True, 239 fg="yellow", 240 ) 241 if group in optional_dependencies: 242 deps = optional_dependencies[group] 243 elif group in dev_dependencies: 244 deps = dev_dependencies[group] 245 else: 246 raise PdmUsageError(f"Non-exist group {group}") 247 result = {} 248 with cd(self.root): 249 for line in deps: 250 if line.startswith("-e "): 251 req = parse_requirement(line[3:].strip(), True) 252 else: 253 req = parse_requirement(line) 254 # make editable packages behind normal ones to override correctly. 255 result[req.identify()] = req 256 return result 257 258 @property 259 def dependencies(self) -> dict[str, Requirement]: 260 return self.get_dependencies() 261 262 @property 263 def dev_dependencies(self) -> dict[str, Requirement]: 264 """All development dependencies""" 265 dev_group = self.tool_settings.get("dev-dependencies", {}) 266 if not dev_group: 267 return {} 268 result = {} 269 with cd(self.root): 270 for _, deps in dev_group.items(): 271 for line in deps: 272 if line.startswith("-e "): 273 req = parse_requirement(line[3:].strip(), True) 274 else: 275 req = parse_requirement(line) 276 result[req.identify()] = req 277 return result 278 279 def iter_groups(self) -> Iterable[str]: 280 groups = {"default"} 281 if self.meta.optional_dependencies: 282 groups.update(self.meta.optional_dependencies.keys()) 283 if self.tool_settings.get("dev-dependencies"): 284 groups.update(self.tool_settings["dev-dependencies"].keys()) 285 return groups 286 287 @property 288 def all_dependencies(self) -> dict[str, dict[str, Requirement]]: 289 return {group: self.get_dependencies(group) for group in self.iter_groups()} 290 291 @property 292 def allow_prereleases(self) -> bool | None: 293 return self.tool_settings.get("allow_prereleases") 294 295 @property 296 def sources(self) -> list[Source]: 297 sources = list(self.tool_settings.get("source", [])) 298 if all(source.get("name") != "pypi" for source in sources): 299 sources.insert( 300 0, 301 { 302 "url": self.config["pypi.url"], 303 "verify_ssl": self.config["pypi.verify_ssl"], 304 "name": "pypi", 305 }, 306 ) 307 expanded_sources: list[Source] = [ 308 Source( 309 url=expand_env_vars_in_auth(s["url"]), 310 verify_ssl=s.get("verify_ssl", True), 311 name=s.get("name", urlparse(s["url"]).hostname), 312 type=s.get("type", "index"), 313 ) 314 for s in sources 315 ] 316 return expanded_sources 317 318 def get_repository(self, cls: Type[BaseRepository] | None = None) -> BaseRepository: 319 """Get the repository object""" 320 if cls is None: 321 cls = PyPIRepository 322 sources = self.sources or [] 323 return cls(sources, self.environment) 324 325 @property 326 def locked_repository(self) -> LockedRepository: 327 import copy 328 329 try: 330 lockfile = copy.deepcopy(self.lockfile) 331 except ProjectError: 332 lockfile = {} 333 334 return LockedRepository(lockfile, self.sources, self.environment) 335 336 def get_provider( 337 self, 338 strategy: str = "all", 339 tracked_names: Iterable[str] | None = None, 340 for_install: bool = False, 341 ) -> BaseProvider: 342 """Build a provider class for resolver. 343 344 :param strategy: the resolve strategy 345 :param tracked_names: the names of packages that needs to update 346 :param for_install: if the provider is for install 347 :returns: The provider object 348 """ 349 350 from pdm.resolver.providers import ( 351 BaseProvider, 352 EagerUpdateProvider, 353 ReusePinProvider, 354 ) 355 356 repository = self.get_repository(cls=self.core.repository_class) 357 allow_prereleases = self.allow_prereleases 358 if strategy != "all" and not self.is_lockfile_compatible(): 359 self.core.ui.echo( 360 "Updating the whole lock file as it is not compatible with PDM", 361 fg="yellow", 362 err=True, 363 ) 364 strategy = "all" 365 if not for_install and strategy == "all": 366 return BaseProvider(repository, allow_prereleases) 367 368 locked_repository = self.locked_repository 369 if for_install: 370 return BaseProvider(locked_repository, allow_prereleases) 371 provider_class = ( 372 ReusePinProvider if strategy == "reuse" else EagerUpdateProvider 373 ) 374 return provider_class( 375 locked_repository.all_candidates, 376 tracked_names or (), 377 repository, 378 allow_prereleases, 379 ) 380 381 def get_reporter( 382 self, 383 requirements: list[Requirement], 384 tracked_names: Iterable[str] | None = None, 385 spinner: halo.Halo | termui.DummySpinner | None = None, 386 ) -> BaseReporter: 387 """Return the reporter object to construct a resolver. 388 389 :param requirements: requirements to resolve 390 :param tracked_names: the names of packages that needs to update 391 :param spinner: optional spinner object 392 :returns: a reporter 393 """ 394 from pdm.resolver.reporters import SpinnerReporter 395 396 return SpinnerReporter(spinner or termui.DummySpinner(), requirements) 397 398 def get_lock_metadata(self) -> dict[str, Any]: 399 content_hash = atoml.string("sha256:" + self.get_content_hash("sha256")) 400 content_hash.trivia.trail = "\n\n" 401 return {"lock_version": self.LOCKFILE_VERSION, "content_hash": content_hash} 402 403 def write_lockfile( 404 self, toml_data: dict, show_message: bool = True, write: bool = True 405 ) -> None: 406 toml_data["metadata"].update(self.get_lock_metadata()) 407 408 if write: 409 with atomic_open_for_write(self.lockfile_file) as fp: 410 atoml.dump(toml_data, fp) # type: ignore 411 if show_message: 412 self.core.ui.echo(f"Changes are written to {termui.green('pdm.lock')}.") 413 self._lockfile = None 414 else: 415 self._lockfile = toml_data 416 417 def make_self_candidate(self, editable: bool = True) -> Candidate: 418 req = parse_requirement(pip_shims.path_to_url(self.root.as_posix()), editable) 419 req.name = self.meta.name 420 return Candidate( 421 req, self.environment, name=self.meta.name, version=self.meta.version 422 ) 423 424 def get_content_hash(self, algo: str = "md5") -> str: 425 # Only calculate sources and dependencies groups. Otherwise lock file is 426 # considered as unchanged. 427 dump_data = { 428 "sources": self.tool_settings.get("source", []), 429 "dependencies": self.meta.get("dependencies", []), 430 "dev-dependencies": self.tool_settings.get("dev-dependencies", {}), 431 "optional-dependencies": self.meta.get("optional-dependencies", {}), 432 "requires-python": self.meta.get("requires-python", ""), 433 } 434 pyproject_content = json.dumps(dump_data, sort_keys=True) 435 hasher = hashlib.new(algo) 436 hasher.update(pyproject_content.encode("utf-8")) 437 return hasher.hexdigest() 438 439 def is_lockfile_hash_match(self) -> bool: 440 if not self.lockfile_file.exists(): 441 return False 442 hash_in_lockfile = str( 443 self.lockfile.get("metadata", {}).get("content_hash", "") 444 ) 445 if not hash_in_lockfile: 446 return False 447 algo, hash_value = hash_in_lockfile.split(":") 448 content_hash = self.get_content_hash(algo) 449 return content_hash == hash_value 450 451 def is_lockfile_compatible(self) -> bool: 452 if not self.lockfile_file.exists(): 453 return True 454 lockfile_version = str( 455 self.lockfile.get("metadata", {}).get("lock_version", "") 456 ) 457 if not lockfile_version: 458 return False 459 if "." not in lockfile_version: 460 lockfile_version += ".0" 461 accepted = get_specifier(f"~={lockfile_version}") 462 return accepted.contains(self.LOCKFILE_VERSION) 463 464 def get_pyproject_dependencies(self, group: str, dev: bool = False) -> list[str]: 465 """Get the dependencies array in the pyproject.toml""" 466 if group == "default": 467 return self.meta.setdefault("dependencies", []) 468 else: 469 deps_dict = { 470 False: self.meta.setdefault("optional-dependencies", {}), 471 True: self.tool_settings.setdefault("dev-dependencies", {}), 472 } 473 for deps in deps_dict.values(): 474 if group in deps: 475 return deps[group] 476 return deps_dict[dev].setdefault(group, []) 477 478 def add_dependencies( 479 self, 480 requirements: dict[str, Requirement], 481 to_group: str = "default", 482 dev: bool = False, 483 show_message: bool = True, 484 ) -> None: 485 deps = self.get_pyproject_dependencies(to_group, dev).multiline( # type: ignore 486 True 487 ) 488 for _, dep in requirements.items(): 489 matched_index = next( 490 (i for i, r in enumerate(deps) if dep.matches(r)), None 491 ) 492 if matched_index is None: 493 deps.append(dep.as_line()) 494 else: 495 req = dep.as_line() 496 deps[matched_index] = req 497 self.write_pyproject(show_message) 498 499 def write_pyproject(self, show_message: bool = True) -> None: 500 with atomic_open_for_write( 501 self.pyproject_file.as_posix(), encoding="utf-8" 502 ) as f: 503 atoml.dump(self.pyproject, f) # type: ignore 504 if show_message: 505 self.core.ui.echo( 506 f"Changes are written to {termui.green('pyproject.toml')}." 507 ) 508 self._pyproject = None 509 510 @property 511 def meta(self) -> Metadata: 512 if not self.pyproject: 513 self.pyproject = {"project": atoml.table()} 514 project_meta = self.pyproject["project"] 515 meta_version = project_meta.get("version") 516 updated = False 517 # Move version to tool table 518 if isinstance(meta_version, dict): 519 if "version" in self.tool_settings: 520 self.core.ui.echo( 521 "WARNING: Removing dynamic `version` from [project] table", 522 fg="yellow", 523 err=True, 524 ) 525 else: 526 self.core.ui.echo( 527 "WARNING: Moving dynamic `version` from [project] to [tool.pdm]", 528 fg="yellow", 529 err=True, 530 ) 531 self.tool_settings["version"] = meta_version 532 del project_meta["version"] 533 updated = True 534 # Delete classifiers from dynamic 535 dynamic_fields = project_meta.get("dynamic", []) 536 if "classifiers" in dynamic_fields: 537 self.core.ui.echo( 538 "WARNING: Dynamic `classifiers` is no longer supported, " 539 "please supply all classifiers manually", 540 fg="yellow", 541 err=True, 542 ) 543 dynamic_fields.remove("classifiers") 544 if not dynamic_fields: 545 del project_meta["dynamic"] 546 updated = True 547 if updated: 548 self.write_pyproject(False) 549 m = Metadata(self.pyproject_file, False) 550 m._metadata = self.pyproject.get("project", {}) 551 m._tool_settings = self.tool_settings 552 return m 553 554 def init_global_project(self) -> None: 555 if not self.is_global: 556 return 557 if not self.pyproject_file.exists(): 558 self.root.mkdir(parents=True, exist_ok=True) 559 self.pyproject_file.write_text( 560 """\ 561[project] 562dependencies = ["pip", "setuptools", "wheel"] 563""" 564 ) 565 self._pyproject = None 566 567 @property 568 def cache_dir(self) -> Path: 569 return Path(self.config.get("cache_dir", "")) 570 571 def cache(self, name: str) -> Path: 572 path = self.cache_dir / name 573 path.mkdir(parents=True, exist_ok=True) 574 return path 575 576 def make_wheel_cache(self) -> pip_shims.WheelCache: 577 return pip_shims.WheelCache( 578 self.cache_dir.as_posix(), pip_shims.FormatControl(set(), set()) 579 ) 580 581 def make_candidate_info_cache(self) -> CandidateInfoCache: 582 583 python_hash = hashlib.sha1( 584 str(self.environment.python_requires).encode() 585 ).hexdigest() 586 file_name = f"package_meta_{python_hash}.json" 587 return CandidateInfoCache(self.cache("metadata") / file_name) 588 589 def make_hash_cache(self) -> HashCache: 590 return HashCache(directory=self.cache("hashes").as_posix()) 591 592 def find_interpreters(self, python_spec: str | None = None) -> Iterable[PythonInfo]: 593 """Return an iterable of interpreter paths that matches the given specifier, 594 which can be: 595 1. a version specifier like 3.7 596 2. an absolute path 597 3. a short name like python3 598 4. None that returns all possible interpreters 599 """ 600 config = self.config 601 python: str | Path | None = None 602 603 if not python_spec: 604 if config.get("python.use_pyenv", True) and PYENV_INSTALLED: 605 pyenv_shim = os.path.join(PYENV_ROOT, "shims", "python3") 606 if os.name == "nt": 607 pyenv_shim += ".bat" 608 if os.path.exists(pyenv_shim): 609 yield PythonInfo.from_path(pyenv_shim) 610 elif os.path.exists(pyenv_shim.replace("python3", "python")): 611 yield PythonInfo.from_path(pyenv_shim.replace("python3", "python")) 612 if config.get("use_venv"): 613 python = get_in_project_venv_python(self.root) 614 if python: 615 yield PythonInfo.from_path(python) 616 python = shutil.which("python") 617 if python: 618 yield PythonInfo.from_path(python) 619 args = [] 620 else: 621 if not all(c.isdigit() for c in python_spec.split(".")): 622 if Path(python_spec).exists(): 623 python = find_python_in_path(python_spec) 624 if python: 625 yield PythonInfo.from_path(python) 626 else: 627 python = shutil.which(python_spec) 628 if python: 629 yield PythonInfo.from_path(python) 630 return 631 args = [int(v) for v in python_spec.split(".") if v != ""] 632 finder = Finder() 633 for entry in finder.find_all_python_versions(*args): 634 yield PythonInfo.from_python_version(entry.py_version) 635 if not python_spec: 636 this_python = getattr(sys, "_base_executable", sys.executable) 637 yield PythonInfo.from_path(this_python) 638