1from __future__ import absolute_import
2
3import logging
4import sys
5import textwrap
6from collections import OrderedDict
7
8from pip._vendor import pkg_resources
9from pip._vendor.packaging.version import parse as parse_version
10
11# NOTE: XMLRPC Client is not annotated in typeshed as on 2017-07-17, which is
12#       why we ignore the type on this import
13from pip._vendor.six.moves import xmlrpc_client  # type: ignore
14
15from pip._internal.cli.base_command import Command
16from pip._internal.cli.req_command import SessionCommandMixin
17from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS
18from pip._internal.exceptions import CommandError
19from pip._internal.models.index import PyPI
20from pip._internal.network.xmlrpc import PipXmlrpcTransport
21from pip._internal.utils.compat import get_terminal_size
22from pip._internal.utils.logging import indent_log
23from pip._internal.utils.misc import get_distribution, write_output
24from pip._internal.utils.typing import MYPY_CHECK_RUNNING
25
26if MYPY_CHECK_RUNNING:
27    from optparse import Values
28    from typing import Dict, List, Optional
29
30    from typing_extensions import TypedDict
31    TransformedHit = TypedDict(
32        'TransformedHit',
33        {'name': str, 'summary': str, 'versions': List[str]},
34    )
35
36logger = logging.getLogger(__name__)
37
38
39class SearchCommand(Command, SessionCommandMixin):
40    """Search for PyPI packages whose name or summary contains <query>."""
41
42    usage = """
43      %prog [options] <query>"""
44    ignore_require_venv = True
45
46    def add_options(self):
47        # type: () -> None
48        self.cmd_opts.add_option(
49            '-i', '--index',
50            dest='index',
51            metavar='URL',
52            default=PyPI.pypi_url,
53            help='Base URL of Python Package Index (default %default)')
54
55        self.parser.insert_option_group(0, self.cmd_opts)
56
57    def run(self, options, args):
58        # type: (Values, List[str]) -> int
59        if not args:
60            raise CommandError('Missing required argument (search query).')
61        query = args
62        pypi_hits = self.search(query, options)
63        hits = transform_hits(pypi_hits)
64
65        terminal_width = None
66        if sys.stdout.isatty():
67            terminal_width = get_terminal_size()[0]
68
69        print_results(hits, terminal_width=terminal_width)
70        if pypi_hits:
71            return SUCCESS
72        return NO_MATCHES_FOUND
73
74    def search(self, query, options):
75        # type: (List[str], Values) -> List[Dict[str, str]]
76        index_url = options.index
77
78        session = self.get_default_session(options)
79
80        transport = PipXmlrpcTransport(index_url, session)
81        pypi = xmlrpc_client.ServerProxy(index_url, transport)
82        try:
83            hits = pypi.search({'name': query, 'summary': query}, 'or')
84        except xmlrpc_client.Fault as fault:
85            message = "XMLRPC request failed [code: {code}]\n{string}".format(
86                code=fault.faultCode,
87                string=fault.faultString,
88            )
89            raise CommandError(message)
90        return hits
91
92
93def transform_hits(hits):
94    # type: (List[Dict[str, str]]) -> List[TransformedHit]
95    """
96    The list from pypi is really a list of versions. We want a list of
97    packages with the list of versions stored inline. This converts the
98    list from pypi into one we can use.
99    """
100    packages = OrderedDict()  # type: OrderedDict[str, TransformedHit]
101    for hit in hits:
102        name = hit['name']
103        summary = hit['summary']
104        version = hit['version']
105
106        if name not in packages.keys():
107            packages[name] = {
108                'name': name,
109                'summary': summary,
110                'versions': [version],
111            }
112        else:
113            packages[name]['versions'].append(version)
114
115            # if this is the highest version, replace summary and score
116            if version == highest_version(packages[name]['versions']):
117                packages[name]['summary'] = summary
118
119    return list(packages.values())
120
121
122def print_results(hits, name_column_width=None, terminal_width=None):
123    # type: (List[TransformedHit], Optional[int], Optional[int]) -> None
124    if not hits:
125        return
126    if name_column_width is None:
127        name_column_width = max([
128            len(hit['name']) + len(highest_version(hit.get('versions', ['-'])))
129            for hit in hits
130        ]) + 4
131
132    installed_packages = [p.project_name for p in pkg_resources.working_set]
133    for hit in hits:
134        name = hit['name']
135        summary = hit['summary'] or ''
136        latest = highest_version(hit.get('versions', ['-']))
137        if terminal_width is not None:
138            target_width = terminal_width - name_column_width - 5
139            if target_width > 10:
140                # wrap and indent summary to fit terminal
141                summary_lines = textwrap.wrap(summary, target_width)
142                summary = ('\n' + ' ' * (name_column_width + 3)).join(
143                    summary_lines)
144
145        line = '{name_latest:{name_column_width}} - {summary}'.format(
146            name_latest='{name} ({latest})'.format(**locals()),
147            **locals())
148        try:
149            write_output(line)
150            if name in installed_packages:
151                dist = get_distribution(name)
152                assert dist is not None
153                with indent_log():
154                    if dist.version == latest:
155                        write_output('INSTALLED: %s (latest)', dist.version)
156                    else:
157                        write_output('INSTALLED: %s', dist.version)
158                        if parse_version(latest).pre:
159                            write_output('LATEST:    %s (pre-release; install'
160                                         ' with "pip install --pre")', latest)
161                        else:
162                            write_output('LATEST:    %s', latest)
163        except UnicodeEncodeError:
164            pass
165
166
167def highest_version(versions):
168    # type: (List[str]) -> str
169    return max(versions, key=parse_version)
170