1from __future__ import absolute_import 2 3import logging 4import sys 5import textwrap 6from collections import OrderedDict 7 8from pip._vendor import pkg_resources 9from pip._vendor.packaging.version import parse as parse_version 10 11# NOTE: XMLRPC Client is not annotated in typeshed as on 2017-07-17, which is 12# why we ignore the type on this import 13from pip._vendor.six.moves import xmlrpc_client # type: ignore 14 15from pip._internal.cli.base_command import Command 16from pip._internal.cli.req_command import SessionCommandMixin 17from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS 18from pip._internal.exceptions import CommandError 19from pip._internal.models.index import PyPI 20from pip._internal.network.xmlrpc import PipXmlrpcTransport 21from pip._internal.utils.compat import get_terminal_size 22from pip._internal.utils.logging import indent_log 23from pip._internal.utils.misc import get_distribution, write_output 24from pip._internal.utils.typing import MYPY_CHECK_RUNNING 25 26if MYPY_CHECK_RUNNING: 27 from optparse import Values 28 from typing import Dict, List, Optional 29 30 from typing_extensions import TypedDict 31 TransformedHit = TypedDict( 32 'TransformedHit', 33 {'name': str, 'summary': str, 'versions': List[str]}, 34 ) 35 36logger = logging.getLogger(__name__) 37 38 39class SearchCommand(Command, SessionCommandMixin): 40 """Search for PyPI packages whose name or summary contains <query>.""" 41 42 usage = """ 43 %prog [options] <query>""" 44 ignore_require_venv = True 45 46 def add_options(self): 47 # type: () -> None 48 self.cmd_opts.add_option( 49 '-i', '--index', 50 dest='index', 51 metavar='URL', 52 default=PyPI.pypi_url, 53 help='Base URL of Python Package Index (default %default)') 54 55 self.parser.insert_option_group(0, self.cmd_opts) 56 57 def run(self, options, args): 58 # type: (Values, List[str]) -> int 59 if not args: 60 raise CommandError('Missing required argument (search query).') 61 query = args 62 pypi_hits = self.search(query, options) 63 hits = transform_hits(pypi_hits) 64 65 terminal_width = None 66 if sys.stdout.isatty(): 67 terminal_width = get_terminal_size()[0] 68 69 print_results(hits, terminal_width=terminal_width) 70 if pypi_hits: 71 return SUCCESS 72 return NO_MATCHES_FOUND 73 74 def search(self, query, options): 75 # type: (List[str], Values) -> List[Dict[str, str]] 76 index_url = options.index 77 78 session = self.get_default_session(options) 79 80 transport = PipXmlrpcTransport(index_url, session) 81 pypi = xmlrpc_client.ServerProxy(index_url, transport) 82 try: 83 hits = pypi.search({'name': query, 'summary': query}, 'or') 84 except xmlrpc_client.Fault as fault: 85 message = "XMLRPC request failed [code: {code}]\n{string}".format( 86 code=fault.faultCode, 87 string=fault.faultString, 88 ) 89 raise CommandError(message) 90 return hits 91 92 93def transform_hits(hits): 94 # type: (List[Dict[str, str]]) -> List[TransformedHit] 95 """ 96 The list from pypi is really a list of versions. We want a list of 97 packages with the list of versions stored inline. This converts the 98 list from pypi into one we can use. 99 """ 100 packages = OrderedDict() # type: OrderedDict[str, TransformedHit] 101 for hit in hits: 102 name = hit['name'] 103 summary = hit['summary'] 104 version = hit['version'] 105 106 if name not in packages.keys(): 107 packages[name] = { 108 'name': name, 109 'summary': summary, 110 'versions': [version], 111 } 112 else: 113 packages[name]['versions'].append(version) 114 115 # if this is the highest version, replace summary and score 116 if version == highest_version(packages[name]['versions']): 117 packages[name]['summary'] = summary 118 119 return list(packages.values()) 120 121 122def print_results(hits, name_column_width=None, terminal_width=None): 123 # type: (List[TransformedHit], Optional[int], Optional[int]) -> None 124 if not hits: 125 return 126 if name_column_width is None: 127 name_column_width = max([ 128 len(hit['name']) + len(highest_version(hit.get('versions', ['-']))) 129 for hit in hits 130 ]) + 4 131 132 installed_packages = [p.project_name for p in pkg_resources.working_set] 133 for hit in hits: 134 name = hit['name'] 135 summary = hit['summary'] or '' 136 latest = highest_version(hit.get('versions', ['-'])) 137 if terminal_width is not None: 138 target_width = terminal_width - name_column_width - 5 139 if target_width > 10: 140 # wrap and indent summary to fit terminal 141 summary_lines = textwrap.wrap(summary, target_width) 142 summary = ('\n' + ' ' * (name_column_width + 3)).join( 143 summary_lines) 144 145 line = '{name_latest:{name_column_width}} - {summary}'.format( 146 name_latest='{name} ({latest})'.format(**locals()), 147 **locals()) 148 try: 149 write_output(line) 150 if name in installed_packages: 151 dist = get_distribution(name) 152 assert dist is not None 153 with indent_log(): 154 if dist.version == latest: 155 write_output('INSTALLED: %s (latest)', dist.version) 156 else: 157 write_output('INSTALLED: %s', dist.version) 158 if parse_version(latest).pre: 159 write_output('LATEST: %s (pre-release; install' 160 ' with "pip install --pre")', latest) 161 else: 162 write_output('LATEST: %s', latest) 163 except UnicodeEncodeError: 164 pass 165 166 167def highest_version(versions): 168 # type: (List[str]) -> str 169 return max(versions, key=parse_version) 170