1import hashlib
2import itertools
3import logging
4import optparse
5import os
6from contextlib import contextmanager
7from shutil import rmtree
8from typing import (
9    Any,
10    BinaryIO,
11    ContextManager,
12    Dict,
13    Iterator,
14    List,
15    NamedTuple,
16    Optional,
17    Set,
18)
19
20from click import progressbar
21from pip._internal.cache import WheelCache
22from pip._internal.cli.progress_bars import BAR_TYPES
23from pip._internal.commands import create_command
24from pip._internal.commands.install import InstallCommand
25from pip._internal.index.package_finder import PackageFinder
26from pip._internal.models.candidate import InstallationCandidate
27from pip._internal.models.index import PackageIndex
28from pip._internal.models.link import Link
29from pip._internal.models.wheel import Wheel
30from pip._internal.network.session import PipSession
31from pip._internal.req import InstallRequirement, RequirementSet
32from pip._internal.req.req_tracker import get_requirement_tracker
33from pip._internal.utils.hashes import FAVORITE_HASH
34from pip._internal.utils.logging import indent_log, setup_logging
35from pip._internal.utils.misc import normalize_path
36from pip._internal.utils.temp_dir import TempDirectory, global_tempdir_manager
37from pip._internal.utils.urls import path_to_url, url_to_path
38from pip._vendor.packaging.tags import Tag
39from pip._vendor.packaging.version import _BaseVersion
40from pip._vendor.requests import RequestException, Session
41
42from .._compat import contextlib
43from ..exceptions import NoCandidateFound
44from ..logging import log
45from ..utils import (
46    as_tuple,
47    is_pinned_requirement,
48    is_url_requirement,
49    lookup_table,
50    make_install_requirement,
51)
52from .base import BaseRepository
53
54FILE_CHUNK_SIZE = 4096
55
56
57class FileStream(NamedTuple):
58    stream: BinaryIO
59    size: Optional[float]
60
61
62class PyPIRepository(BaseRepository):
63    HASHABLE_PACKAGE_TYPES = {"bdist_wheel", "sdist"}
64
65    """
66    The PyPIRepository will use the provided Finder instance to lookup
67    packages.  Typically, it looks up packages on PyPI (the default implicit
68    config), but any other PyPI mirror can be used if index_urls is
69    changed/configured on the Finder.
70    """
71
72    def __init__(self, pip_args: List[str], cache_dir: str):
73        # Use pip's parser for pip.conf management and defaults.
74        # General options (find_links, index_url, extra_index_url, trusted_host,
75        # and pre) are deferred to pip.
76        self.command: InstallCommand = create_command("install")
77        extra_pip_args = ["--use-deprecated", "legacy-resolver"]
78
79        options, _ = self.command.parse_args(pip_args + extra_pip_args)
80        if options.cache_dir:
81            options.cache_dir = normalize_path(options.cache_dir)
82        options.require_hashes = False
83        options.ignore_dependencies = False
84
85        self._options: optparse.Values = options
86        self._session = self.command._build_session(options)
87        self._finder = self.command._build_package_finder(
88            options=options, session=self.session
89        )
90
91        # Caches
92        # stores project_name => InstallationCandidate mappings for all
93        # versions reported by PyPI, so we only have to ask once for each
94        # project
95        self._available_candidates_cache: Dict[str, List[InstallationCandidate]] = {}
96
97        # stores InstallRequirement => list(InstallRequirement) mappings
98        # of all secondary dependencies for the given requirement, so we
99        # only have to go to disk once for each requirement
100        self._dependencies_cache: Dict[InstallRequirement, Set[InstallRequirement]] = {}
101
102        # Setup file paths
103        self._cache_dir = normalize_path(str(cache_dir))
104        self._download_dir = os.path.join(self._cache_dir, "pkgs")
105
106        self._setup_logging()
107
108    def clear_caches(self) -> None:
109        rmtree(self._download_dir, ignore_errors=True)
110
111    @property
112    def options(self) -> optparse.Values:
113        return self._options
114
115    @property
116    def session(self) -> PipSession:
117        return self._session
118
119    @property
120    def finder(self) -> PackageFinder:
121        return self._finder
122
123    def find_all_candidates(self, req_name: str) -> List[InstallationCandidate]:
124        if req_name not in self._available_candidates_cache:
125            candidates = self.finder.find_all_candidates(req_name)
126            self._available_candidates_cache[req_name] = candidates
127        return self._available_candidates_cache[req_name]
128
129    def find_best_match(
130        self, ireq: InstallRequirement, prereleases: Optional[bool] = None
131    ) -> InstallRequirement:
132        """
133        Returns a pinned InstallRequirement object that indicates the best match
134        for the given InstallRequirement according to the external repository.
135        """
136        if ireq.editable or is_url_requirement(ireq):
137            return ireq  # return itself as the best match
138
139        all_candidates = self.find_all_candidates(ireq.name)
140        candidates_by_version = lookup_table(all_candidates, key=candidate_version)
141        matching_versions = ireq.specifier.filter(
142            (candidate.version for candidate in all_candidates), prereleases=prereleases
143        )
144
145        matching_candidates = list(
146            itertools.chain.from_iterable(
147                candidates_by_version[ver] for ver in matching_versions
148            )
149        )
150        if not matching_candidates:
151            raise NoCandidateFound(ireq, all_candidates, self.finder)
152
153        evaluator = self.finder.make_candidate_evaluator(ireq.name)
154        best_candidate_result = evaluator.compute_best_candidate(matching_candidates)
155        best_candidate = best_candidate_result.best_candidate
156
157        # Turn the candidate into a pinned InstallRequirement
158        return make_install_requirement(
159            best_candidate.name,
160            best_candidate.version,
161            ireq,
162        )
163
164    def resolve_reqs(
165        self,
166        download_dir: Optional[str],
167        ireq: InstallRequirement,
168        wheel_cache: WheelCache,
169    ) -> Set[InstallationCandidate]:
170        with get_requirement_tracker() as req_tracker, TempDirectory(
171            kind="resolver"
172        ) as temp_dir, indent_log():
173            preparer_kwargs = {
174                "temp_build_dir": temp_dir,
175                "options": self.options,
176                "req_tracker": req_tracker,
177                "session": self.session,
178                "finder": self.finder,
179                "use_user_site": False,
180                "download_dir": download_dir,
181            }
182            preparer = self.command.make_requirement_preparer(**preparer_kwargs)
183
184            reqset = RequirementSet()
185            ireq.user_supplied = True
186            reqset.add_requirement(ireq)
187
188            resolver = self.command.make_resolver(
189                preparer=preparer,
190                finder=self.finder,
191                options=self.options,
192                wheel_cache=wheel_cache,
193                use_user_site=False,
194                ignore_installed=True,
195                ignore_requires_python=False,
196                force_reinstall=False,
197                upgrade_strategy="to-satisfy-only",
198            )
199            results = resolver._resolve_one(reqset, ireq)
200            if not ireq.prepared:
201                # If still not prepared, e.g. a constraint, do enough to assign
202                # the ireq a name:
203                resolver._get_dist_for(ireq)
204
205        return set(results)
206
207    def get_dependencies(self, ireq: InstallRequirement) -> Set[InstallRequirement]:
208        """
209        Given a pinned, URL, or editable InstallRequirement, returns a set of
210        dependencies (also InstallRequirements, but not necessarily pinned).
211        They indicate the secondary dependencies for the given requirement.
212        """
213        if not (
214            ireq.editable or is_url_requirement(ireq) or is_pinned_requirement(ireq)
215        ):
216            raise TypeError(
217                f"Expected url, pinned or editable InstallRequirement, got {ireq}"
218            )
219
220        if ireq not in self._dependencies_cache:
221            if ireq.editable and (ireq.source_dir and os.path.exists(ireq.source_dir)):
222                # No download_dir for locally available editable requirements.
223                # If a download_dir is passed, pip will unnecessarily archive
224                # the entire source directory
225                download_dir = None
226            elif ireq.link and ireq.link.is_vcs:
227                # No download_dir for VCS sources.  This also works around pip
228                # using git-checkout-index, which gets rid of the .git dir.
229                download_dir = None
230            else:
231                download_dir = self._get_download_path(ireq)
232                os.makedirs(download_dir, exist_ok=True)
233
234            with global_tempdir_manager():
235                wheel_cache = WheelCache(self._cache_dir, self.options.format_control)
236                self._dependencies_cache[ireq] = self.resolve_reqs(
237                    download_dir, ireq, wheel_cache
238                )
239
240        return self._dependencies_cache[ireq]
241
242    def _get_project(self, ireq: InstallRequirement) -> Any:
243        """
244        Return a dict of a project info from PyPI JSON API for a given
245        InstallRequirement. Return None on HTTP/JSON error or if a package
246        is not found on PyPI server.
247
248        API reference: https://warehouse.readthedocs.io/api-reference/json/
249        """
250        package_indexes = (
251            PackageIndex(url=index_url, file_storage_domain="")
252            for index_url in self.finder.search_scope.index_urls
253        )
254        for package_index in package_indexes:
255            url = f"{package_index.pypi_url}/{ireq.name}/json"
256            try:
257                response = self.session.get(url)
258            except RequestException as e:
259                log.debug(f"Fetch package info from PyPI failed: {url}: {e}")
260                continue
261
262            # Skip this PyPI server, because there is no package
263            # or JSON API might be not supported
264            if response.status_code == 404:
265                continue
266
267            try:
268                data = response.json()
269            except ValueError as e:
270                log.debug(f"Cannot parse JSON response from PyPI: {url}: {e}")
271                continue
272            return data
273        return None
274
275    def _get_download_path(self, ireq: InstallRequirement) -> str:
276        """
277        Determine the download dir location in a way which avoids name
278        collisions.
279        """
280        if ireq.link:
281            salt = hashlib.sha224(ireq.link.url_without_fragment.encode()).hexdigest()
282            # Nest directories to avoid running out of top level dirs on some FS
283            # (see pypi _get_cache_path_parts, which inspired this)
284            return os.path.join(
285                self._download_dir, salt[:2], salt[2:4], salt[4:6], salt[6:]
286            )
287        else:
288            return self._download_dir
289
290    def get_hashes(self, ireq: InstallRequirement) -> Set[str]:
291        """
292        Given an InstallRequirement, return a set of hashes that represent all
293        of the files for a given requirement. Unhashable requirements return an
294        empty set. Unpinned requirements raise a TypeError.
295        """
296
297        if ireq.link:
298            link = ireq.link
299
300            if link.is_vcs or (link.is_file and link.is_existing_dir()):
301                # Return empty set for unhashable requirements.
302                # Unhashable logic modeled on pip's
303                # RequirementPreparer.prepare_linked_requirement
304                return set()
305
306            if is_url_requirement(ireq):
307                # Directly hash URL requirements.
308                # URL requirements may have been previously downloaded and cached
309                # locally by self.resolve_reqs()
310                cached_path = os.path.join(self._get_download_path(ireq), link.filename)
311                if os.path.exists(cached_path):
312                    cached_link = Link(path_to_url(cached_path))
313                else:
314                    cached_link = link
315                return {self._get_file_hash(cached_link)}
316
317        if not is_pinned_requirement(ireq):
318            raise TypeError(f"Expected pinned requirement, got {ireq}")
319
320        log.debug(ireq.name)
321
322        with log.indentation():
323            hashes = self._get_hashes_from_pypi(ireq)
324            if hashes is None:
325                log.debug("Couldn't get hashes from PyPI, fallback to hashing files")
326                return self._get_hashes_from_files(ireq)
327
328        return hashes
329
330    def _get_hashes_from_pypi(self, ireq: InstallRequirement) -> Optional[Set[str]]:
331        """
332        Return a set of hashes from PyPI JSON API for a given InstallRequirement.
333        Return None if fetching data is failed or missing digests.
334        """
335        project = self._get_project(ireq)
336        if project is None:
337            return None
338
339        _, version, _ = as_tuple(ireq)
340
341        try:
342            release_files = project["releases"][version]
343        except KeyError:
344            log.debug("Missing release files on PyPI")
345            return None
346
347        try:
348            hashes = {
349                f"{FAVORITE_HASH}:{file_['digests'][FAVORITE_HASH]}"
350                for file_ in release_files
351                if file_["packagetype"] in self.HASHABLE_PACKAGE_TYPES
352            }
353        except KeyError:
354            log.debug("Missing digests of release files on PyPI")
355            return None
356
357        return hashes
358
359    def _get_hashes_from_files(self, ireq: InstallRequirement) -> Set[str]:
360        """
361        Return a set of hashes for all release files of a given InstallRequirement.
362        """
363        # We need to get all of the candidates that match our current version
364        # pin, these will represent all of the files that could possibly
365        # satisfy this constraint.
366        all_candidates = self.find_all_candidates(ireq.name)
367        candidates_by_version = lookup_table(all_candidates, key=candidate_version)
368        matching_versions = list(
369            ireq.specifier.filter(candidate.version for candidate in all_candidates)
370        )
371        matching_candidates = candidates_by_version[matching_versions[0]]
372
373        return {
374            self._get_file_hash(candidate.link) for candidate in matching_candidates
375        }
376
377    def _get_file_hash(self, link: Link) -> str:
378        log.debug(f"Hashing {link.show_url}")
379        h = hashlib.new(FAVORITE_HASH)
380        with open_local_or_remote_file(link, self.session) as f:
381            # Chunks to iterate
382            chunks = iter(lambda: f.stream.read(FILE_CHUNK_SIZE), b"")
383
384            # Choose a context manager depending on verbosity
385            context_manager: ContextManager[Iterator[bytes]]
386            if log.verbosity >= 1:
387                iter_length = int(f.size / FILE_CHUNK_SIZE) if f.size else None
388                bar_template = f"{' ' * log.current_indent}  |%(bar)s| %(info)s"
389                context_manager = progressbar(
390                    chunks,
391                    length=iter_length,
392                    # Make it look like default pip progress bar
393                    fill_char="█",
394                    empty_char=" ",
395                    bar_template=bar_template,
396                    width=32,
397                )
398            else:
399                context_manager = contextlib.nullcontext(chunks)
400
401            # Iterate over the chosen context manager
402            with context_manager as bar:
403                for chunk in bar:
404                    h.update(chunk)
405        return ":".join([FAVORITE_HASH, h.hexdigest()])
406
407    @contextmanager
408    def allow_all_wheels(self) -> Iterator[None]:
409        """
410        Monkey patches pip.Wheel to allow wheels from all platforms and Python versions.
411
412        This also saves the candidate cache and set a new one, or else the results from
413        the previous non-patched calls will interfere.
414        """
415
416        def _wheel_supported(self: Wheel, tags: List[Tag]) -> bool:
417            # Ignore current platform. Support everything.
418            return True
419
420        def _wheel_support_index_min(self: Wheel, tags: List[Tag]) -> int:
421            # All wheels are equal priority for sorting.
422            return 0
423
424        original_wheel_supported = Wheel.supported
425        original_support_index_min = Wheel.support_index_min
426        original_cache = self._available_candidates_cache
427
428        Wheel.supported = _wheel_supported
429        Wheel.support_index_min = _wheel_support_index_min
430        self._available_candidates_cache = {}
431
432        try:
433            yield
434        finally:
435            Wheel.supported = original_wheel_supported
436            Wheel.support_index_min = original_support_index_min
437            self._available_candidates_cache = original_cache
438
439    def _setup_logging(self) -> None:
440        """
441        Setup pip's logger. Ensure pip is verbose same as pip-tools and sync
442        pip's log stream with LogContext.stream.
443        """
444        # Default pip's logger is noisy, so decrease it's verbosity
445        setup_logging(
446            verbosity=log.verbosity - 1,
447            no_color=self.options.no_color,
448            user_log_file=self.options.log,
449        )
450
451        # Sync pip's console handler stream with LogContext.stream
452        logger = logging.getLogger()
453        for handler in logger.handlers:
454            if handler.name == "console":  # pragma: no branch
455                assert isinstance(handler, logging.StreamHandler)
456                handler.stream = log.stream
457                break
458        else:  # pragma: no cover
459            # There is always a console handler. This warning would be a signal that
460            # this block should be removed/revisited, because of pip possibly
461            # refactored-out logging config.
462            log.warning("Couldn't find a 'console' logging handler")
463
464        # Sync pip's progress bars stream with LogContext.stream
465        for bar_cls in itertools.chain(*BAR_TYPES.values()):
466            bar_cls.file = log.stream
467
468
469@contextmanager
470def open_local_or_remote_file(link: Link, session: Session) -> Iterator[FileStream]:
471    """
472    Open local or remote file for reading.
473
474    :type link: pip.index.Link
475    :type session: requests.Session
476    :raises ValueError: If link points to a local directory.
477    :return: a context manager to a FileStream with the opened file-like object
478    """
479    url = link.url_without_fragment
480
481    if link.is_file:
482        # Local URL
483        local_path = url_to_path(url)
484        if os.path.isdir(local_path):
485            raise ValueError(f"Cannot open directory for read: {url}")
486        else:
487            st = os.stat(local_path)
488            with open(local_path, "rb") as local_file:
489                yield FileStream(stream=local_file, size=st.st_size)
490    else:
491        # Remote URL
492        headers = {"Accept-Encoding": "identity"}
493        response = session.get(url, headers=headers, stream=True)
494
495        # Content length must be int or None
496        content_length: Optional[int]
497        try:
498            content_length = int(response.headers["content-length"])
499        except (ValueError, KeyError, TypeError):
500            content_length = None
501
502        try:
503            yield FileStream(stream=response.raw, size=content_length)
504        finally:
505            response.close()
506
507
508def candidate_version(candidate: InstallationCandidate) -> _BaseVersion:
509    return candidate.version
510