1"""Cache Management
2"""
3
4import hashlib
5import json
6import logging
7import os
8
9from pip._vendor.packaging.tags import interpreter_name, interpreter_version
10from pip._vendor.packaging.utils import canonicalize_name
11
12from pip._internal.exceptions import InvalidWheelFilename
13from pip._internal.models.link import Link
14from pip._internal.models.wheel import Wheel
15from pip._internal.utils.temp_dir import TempDirectory, tempdir_kinds
16from pip._internal.utils.typing import MYPY_CHECK_RUNNING
17from pip._internal.utils.urls import path_to_url
18
19if MYPY_CHECK_RUNNING:
20    from typing import Any, Dict, List, Optional, Set
21
22    from pip._vendor.packaging.tags import Tag
23
24    from pip._internal.models.format_control import FormatControl
25
26logger = logging.getLogger(__name__)
27
28
29def _hash_dict(d):
30    # type: (Dict[str, str]) -> str
31    """Return a stable sha224 of a dictionary."""
32    s = json.dumps(d, sort_keys=True, separators=(",", ":"), ensure_ascii=True)
33    return hashlib.sha224(s.encode("ascii")).hexdigest()
34
35
36class Cache(object):
37    """An abstract class - provides cache directories for data from links
38
39
40        :param cache_dir: The root of the cache.
41        :param format_control: An object of FormatControl class to limit
42            binaries being read from the cache.
43        :param allowed_formats: which formats of files the cache should store.
44            ('binary' and 'source' are the only allowed values)
45    """
46
47    def __init__(self, cache_dir, format_control, allowed_formats):
48        # type: (str, FormatControl, Set[str]) -> None
49        super(Cache, self).__init__()
50        assert not cache_dir or os.path.isabs(cache_dir)
51        self.cache_dir = cache_dir or None
52        self.format_control = format_control
53        self.allowed_formats = allowed_formats
54
55        _valid_formats = {"source", "binary"}
56        assert self.allowed_formats.union(_valid_formats) == _valid_formats
57
58    def _get_cache_path_parts_legacy(self, link):
59        # type: (Link) -> List[str]
60        """Get parts of part that must be os.path.joined with cache_dir
61
62        Legacy cache key (pip < 20) for compatibility with older caches.
63        """
64
65        # We want to generate an url to use as our cache key, we don't want to
66        # just re-use the URL because it might have other items in the fragment
67        # and we don't care about those.
68        key_parts = [link.url_without_fragment]
69        if link.hash_name is not None and link.hash is not None:
70            key_parts.append("=".join([link.hash_name, link.hash]))
71        key_url = "#".join(key_parts)
72
73        # Encode our key url with sha224, we'll use this because it has similar
74        # security properties to sha256, but with a shorter total output (and
75        # thus less secure). However the differences don't make a lot of
76        # difference for our use case here.
77        hashed = hashlib.sha224(key_url.encode()).hexdigest()
78
79        # We want to nest the directories some to prevent having a ton of top
80        # level directories where we might run out of sub directories on some
81        # FS.
82        parts = [hashed[:2], hashed[2:4], hashed[4:6], hashed[6:]]
83
84        return parts
85
86    def _get_cache_path_parts(self, link):
87        # type: (Link) -> List[str]
88        """Get parts of part that must be os.path.joined with cache_dir
89        """
90
91        # We want to generate an url to use as our cache key, we don't want to
92        # just re-use the URL because it might have other items in the fragment
93        # and we don't care about those.
94        key_parts = {"url": link.url_without_fragment}
95        if link.hash_name is not None and link.hash is not None:
96            key_parts[link.hash_name] = link.hash
97        if link.subdirectory_fragment:
98            key_parts["subdirectory"] = link.subdirectory_fragment
99
100        # Include interpreter name, major and minor version in cache key
101        # to cope with ill-behaved sdists that build a different wheel
102        # depending on the python version their setup.py is being run on,
103        # and don't encode the difference in compatibility tags.
104        # https://github.com/pypa/pip/issues/7296
105        key_parts["interpreter_name"] = interpreter_name()
106        key_parts["interpreter_version"] = interpreter_version()
107
108        # Encode our key url with sha224, we'll use this because it has similar
109        # security properties to sha256, but with a shorter total output (and
110        # thus less secure). However the differences don't make a lot of
111        # difference for our use case here.
112        hashed = _hash_dict(key_parts)
113
114        # We want to nest the directories some to prevent having a ton of top
115        # level directories where we might run out of sub directories on some
116        # FS.
117        parts = [hashed[:2], hashed[2:4], hashed[4:6], hashed[6:]]
118
119        return parts
120
121    def _get_candidates(self, link, canonical_package_name):
122        # type: (Link, str) -> List[Any]
123        can_not_cache = (
124            not self.cache_dir or
125            not canonical_package_name or
126            not link
127        )
128        if can_not_cache:
129            return []
130
131        formats = self.format_control.get_allowed_formats(
132            canonical_package_name
133        )
134        if not self.allowed_formats.intersection(formats):
135            return []
136
137        candidates = []
138        path = self.get_path_for_link(link)
139        if os.path.isdir(path):
140            for candidate in os.listdir(path):
141                candidates.append((candidate, path))
142        # TODO remove legacy path lookup in pip>=21
143        legacy_path = self.get_path_for_link_legacy(link)
144        if os.path.isdir(legacy_path):
145            for candidate in os.listdir(legacy_path):
146                candidates.append((candidate, legacy_path))
147        return candidates
148
149    def get_path_for_link_legacy(self, link):
150        # type: (Link) -> str
151        raise NotImplementedError()
152
153    def get_path_for_link(self, link):
154        # type: (Link) -> str
155        """Return a directory to store cached items in for link.
156        """
157        raise NotImplementedError()
158
159    def get(
160        self,
161        link,            # type: Link
162        package_name,    # type: Optional[str]
163        supported_tags,  # type: List[Tag]
164    ):
165        # type: (...) -> Link
166        """Returns a link to a cached item if it exists, otherwise returns the
167        passed link.
168        """
169        raise NotImplementedError()
170
171
172class SimpleWheelCache(Cache):
173    """A cache of wheels for future installs.
174    """
175
176    def __init__(self, cache_dir, format_control):
177        # type: (str, FormatControl) -> None
178        super(SimpleWheelCache, self).__init__(
179            cache_dir, format_control, {"binary"}
180        )
181
182    def get_path_for_link_legacy(self, link):
183        # type: (Link) -> str
184        parts = self._get_cache_path_parts_legacy(link)
185        assert self.cache_dir
186        return os.path.join(self.cache_dir, "wheels", *parts)
187
188    def get_path_for_link(self, link):
189        # type: (Link) -> str
190        """Return a directory to store cached wheels for link
191
192        Because there are M wheels for any one sdist, we provide a directory
193        to cache them in, and then consult that directory when looking up
194        cache hits.
195
196        We only insert things into the cache if they have plausible version
197        numbers, so that we don't contaminate the cache with things that were
198        not unique. E.g. ./package might have dozens of installs done for it
199        and build a version of 0.0...and if we built and cached a wheel, we'd
200        end up using the same wheel even if the source has been edited.
201
202        :param link: The link of the sdist for which this will cache wheels.
203        """
204        parts = self._get_cache_path_parts(link)
205        assert self.cache_dir
206        # Store wheels within the root cache_dir
207        return os.path.join(self.cache_dir, "wheels", *parts)
208
209    def get(
210        self,
211        link,            # type: Link
212        package_name,    # type: Optional[str]
213        supported_tags,  # type: List[Tag]
214    ):
215        # type: (...) -> Link
216        candidates = []
217
218        if not package_name:
219            return link
220
221        canonical_package_name = canonicalize_name(package_name)
222        for wheel_name, wheel_dir in self._get_candidates(
223            link, canonical_package_name
224        ):
225            try:
226                wheel = Wheel(wheel_name)
227            except InvalidWheelFilename:
228                continue
229            if canonicalize_name(wheel.name) != canonical_package_name:
230                logger.debug(
231                    "Ignoring cached wheel %s for %s as it "
232                    "does not match the expected distribution name %s.",
233                    wheel_name, link, package_name,
234                )
235                continue
236            if not wheel.supported(supported_tags):
237                # Built for a different python/arch/etc
238                continue
239            candidates.append(
240                (
241                    wheel.support_index_min(supported_tags),
242                    wheel_name,
243                    wheel_dir,
244                )
245            )
246
247        if not candidates:
248            return link
249
250        _, wheel_name, wheel_dir = min(candidates)
251        return Link(path_to_url(os.path.join(wheel_dir, wheel_name)))
252
253
254class EphemWheelCache(SimpleWheelCache):
255    """A SimpleWheelCache that creates it's own temporary cache directory
256    """
257
258    def __init__(self, format_control):
259        # type: (FormatControl) -> None
260        self._temp_dir = TempDirectory(
261            kind=tempdir_kinds.EPHEM_WHEEL_CACHE,
262            globally_managed=True,
263        )
264
265        super(EphemWheelCache, self).__init__(
266            self._temp_dir.path, format_control
267        )
268
269
270class CacheEntry(object):
271    def __init__(
272        self,
273        link,  # type: Link
274        persistent,  # type: bool
275    ):
276        self.link = link
277        self.persistent = persistent
278
279
280class WheelCache(Cache):
281    """Wraps EphemWheelCache and SimpleWheelCache into a single Cache
282
283    This Cache allows for gracefully degradation, using the ephem wheel cache
284    when a certain link is not found in the simple wheel cache first.
285    """
286
287    def __init__(self, cache_dir, format_control):
288        # type: (str, FormatControl) -> None
289        super(WheelCache, self).__init__(
290            cache_dir, format_control, {'binary'}
291        )
292        self._wheel_cache = SimpleWheelCache(cache_dir, format_control)
293        self._ephem_cache = EphemWheelCache(format_control)
294
295    def get_path_for_link_legacy(self, link):
296        # type: (Link) -> str
297        return self._wheel_cache.get_path_for_link_legacy(link)
298
299    def get_path_for_link(self, link):
300        # type: (Link) -> str
301        return self._wheel_cache.get_path_for_link(link)
302
303    def get_ephem_path_for_link(self, link):
304        # type: (Link) -> str
305        return self._ephem_cache.get_path_for_link(link)
306
307    def get(
308        self,
309        link,            # type: Link
310        package_name,    # type: Optional[str]
311        supported_tags,  # type: List[Tag]
312    ):
313        # type: (...) -> Link
314        cache_entry = self.get_cache_entry(link, package_name, supported_tags)
315        if cache_entry is None:
316            return link
317        return cache_entry.link
318
319    def get_cache_entry(
320        self,
321        link,            # type: Link
322        package_name,    # type: Optional[str]
323        supported_tags,  # type: List[Tag]
324    ):
325        # type: (...) -> Optional[CacheEntry]
326        """Returns a CacheEntry with a link to a cached item if it exists or
327        None. The cache entry indicates if the item was found in the persistent
328        or ephemeral cache.
329        """
330        retval = self._wheel_cache.get(
331            link=link,
332            package_name=package_name,
333            supported_tags=supported_tags,
334        )
335        if retval is not link:
336            return CacheEntry(retval, persistent=True)
337
338        retval = self._ephem_cache.get(
339            link=link,
340            package_name=package_name,
341            supported_tags=supported_tags,
342        )
343        if retval is not link:
344            return CacheEntry(retval, persistent=False)
345
346        return None
347