1from __future__ import absolute_import
2
3import logging
4import os
5import textwrap
6
7import pip._internal.utils.filesystem as filesystem
8from pip._internal.cli.base_command import Command
9from pip._internal.cli.status_codes import ERROR, SUCCESS
10from pip._internal.exceptions import CommandError, PipError
11from pip._internal.utils.typing import MYPY_CHECK_RUNNING
12
13if MYPY_CHECK_RUNNING:
14    from optparse import Values
15    from typing import Any, List
16
17
18logger = logging.getLogger(__name__)
19
20
21class CacheCommand(Command):
22    """
23    Inspect and manage pip's wheel cache.
24
25    Subcommands:
26
27    - dir: Show the cache directory.
28    - info: Show information about the cache.
29    - list: List filenames of packages stored in the cache.
30    - remove: Remove one or more package from the cache.
31    - purge: Remove all items from the cache.
32
33    ``<pattern>`` can be a glob expression or a package name.
34    """
35
36    ignore_require_venv = True
37    usage = """
38        %prog dir
39        %prog info
40        %prog list [<pattern>] [--format=[human, abspath]]
41        %prog remove <pattern>
42        %prog purge
43    """
44
45    def add_options(self):
46        # type: () -> None
47
48        self.cmd_opts.add_option(
49            '--format',
50            action='store',
51            dest='list_format',
52            default="human",
53            choices=('human', 'abspath'),
54            help="Select the output format among: human (default) or abspath"
55        )
56
57        self.parser.insert_option_group(0, self.cmd_opts)
58
59    def run(self, options, args):
60        # type: (Values, List[Any]) -> int
61        handlers = {
62            "dir": self.get_cache_dir,
63            "info": self.get_cache_info,
64            "list": self.list_cache_items,
65            "remove": self.remove_cache_items,
66            "purge": self.purge_cache,
67        }
68
69        if not options.cache_dir:
70            logger.error("pip cache commands can not "
71                         "function since cache is disabled.")
72            return ERROR
73
74        # Determine action
75        if not args or args[0] not in handlers:
76            logger.error(
77                "Need an action (%s) to perform.",
78                ", ".join(sorted(handlers)),
79            )
80            return ERROR
81
82        action = args[0]
83
84        # Error handling happens here, not in the action-handlers.
85        try:
86            handlers[action](options, args[1:])
87        except PipError as e:
88            logger.error(e.args[0])
89            return ERROR
90
91        return SUCCESS
92
93    def get_cache_dir(self, options, args):
94        # type: (Values, List[Any]) -> None
95        if args:
96            raise CommandError('Too many arguments')
97
98        logger.info(options.cache_dir)
99
100    def get_cache_info(self, options, args):
101        # type: (Values, List[Any]) -> None
102        if args:
103            raise CommandError('Too many arguments')
104
105        num_http_files = len(self._find_http_files(options))
106        num_packages = len(self._find_wheels(options, '*'))
107
108        http_cache_location = self._cache_dir(options, 'http')
109        wheels_cache_location = self._cache_dir(options, 'wheels')
110        http_cache_size = filesystem.format_directory_size(http_cache_location)
111        wheels_cache_size = filesystem.format_directory_size(
112            wheels_cache_location
113        )
114
115        message = textwrap.dedent("""
116            Package index page cache location: {http_cache_location}
117            Package index page cache size: {http_cache_size}
118            Number of HTTP files: {num_http_files}
119            Wheels location: {wheels_cache_location}
120            Wheels size: {wheels_cache_size}
121            Number of wheels: {package_count}
122        """).format(
123            http_cache_location=http_cache_location,
124            http_cache_size=http_cache_size,
125            num_http_files=num_http_files,
126            wheels_cache_location=wheels_cache_location,
127            package_count=num_packages,
128            wheels_cache_size=wheels_cache_size,
129        ).strip()
130
131        logger.info(message)
132
133    def list_cache_items(self, options, args):
134        # type: (Values, List[Any]) -> None
135        if len(args) > 1:
136            raise CommandError('Too many arguments')
137
138        if args:
139            pattern = args[0]
140        else:
141            pattern = '*'
142
143        files = self._find_wheels(options, pattern)
144        if options.list_format == 'human':
145            self.format_for_human(files)
146        else:
147            self.format_for_abspath(files)
148
149    def format_for_human(self, files):
150        # type: (List[str]) -> None
151        if not files:
152            logger.info('Nothing cached.')
153            return
154
155        results = []
156        for filename in files:
157            wheel = os.path.basename(filename)
158            size = filesystem.format_file_size(filename)
159            results.append(' - {} ({})'.format(wheel, size))
160        logger.info('Cache contents:\n')
161        logger.info('\n'.join(sorted(results)))
162
163    def format_for_abspath(self, files):
164        # type: (List[str]) -> None
165        if not files:
166            return
167
168        results = []
169        for filename in files:
170            results.append(filename)
171
172        logger.info('\n'.join(sorted(results)))
173
174    def remove_cache_items(self, options, args):
175        # type: (Values, List[Any]) -> None
176        if len(args) > 1:
177            raise CommandError('Too many arguments')
178
179        if not args:
180            raise CommandError('Please provide a pattern')
181
182        files = self._find_wheels(options, args[0])
183
184        # Only fetch http files if no specific pattern given
185        if args[0] == '*':
186            files += self._find_http_files(options)
187
188        if not files:
189            raise CommandError('No matching packages')
190
191        for filename in files:
192            os.unlink(filename)
193            logger.debug('Removed %s', filename)
194        logger.info('Files removed: %s', len(files))
195
196    def purge_cache(self, options, args):
197        # type: (Values, List[Any]) -> None
198        if args:
199            raise CommandError('Too many arguments')
200
201        return self.remove_cache_items(options, ['*'])
202
203    def _cache_dir(self, options, subdir):
204        # type: (Values, str) -> str
205        return os.path.join(options.cache_dir, subdir)
206
207    def _find_http_files(self, options):
208        # type: (Values) -> List[str]
209        http_dir = self._cache_dir(options, 'http')
210        return filesystem.find_files(http_dir, '*')
211
212    def _find_wheels(self, options, pattern):
213        # type: (Values, str) -> List[str]
214        wheel_dir = self._cache_dir(options, 'wheels')
215
216        # The wheel filename format, as specified in PEP 427, is:
217        #     {distribution}-{version}(-{build})?-{python}-{abi}-{platform}.whl
218        #
219        # Additionally, non-alphanumeric values in the distribution are
220        # normalized to underscores (_), meaning hyphens can never occur
221        # before `-{version}`.
222        #
223        # Given that information:
224        # - If the pattern we're given contains a hyphen (-), the user is
225        #   providing at least the version. Thus, we can just append `*.whl`
226        #   to match the rest of it.
227        # - If the pattern we're given doesn't contain a hyphen (-), the
228        #   user is only providing the name. Thus, we append `-*.whl` to
229        #   match the hyphen before the version, followed by anything else.
230        #
231        # PEP 427: https://www.python.org/dev/peps/pep-0427/
232        pattern = pattern + ("*.whl" if "-" in pattern else "-*.whl")
233
234        return filesystem.find_files(wheel_dir, pattern)
235