| Viewing file:  search.py (5.56 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
import loggingimport shutil
 import sys
 import textwrap
 import xmlrpc.client
 from collections import OrderedDict
 from optparse import Values
 from typing import TYPE_CHECKING, Dict, List, Optional
 
 from pip._vendor.packaging.version import parse as parse_version
 
 from pip._internal.cli.base_command import Command
 from pip._internal.cli.req_command import SessionCommandMixin
 from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS
 from pip._internal.exceptions import CommandError
 from pip._internal.metadata import get_default_environment
 from pip._internal.models.index import PyPI
 from pip._internal.network.xmlrpc import PipXmlrpcTransport
 from pip._internal.utils.logging import indent_log
 from pip._internal.utils.misc import write_output
 
 if TYPE_CHECKING:
 from typing import TypedDict
 
 class TransformedHit(TypedDict):
 name: str
 summary: str
 versions: List[str]
 
 
 logger = logging.getLogger(__name__)
 
 
 class SearchCommand(Command, SessionCommandMixin):
 """Search for PyPI packages whose name or summary contains <query>."""
 
 usage = """
 %prog [options] <query>"""
 ignore_require_venv = True
 
 def add_options(self) -> None:
 self.cmd_opts.add_option(
 "-i",
 "--index",
 dest="index",
 metavar="URL",
 default=PyPI.pypi_url,
 help="Base URL of Python Package Index (default %default)",
 )
 
 self.parser.insert_option_group(0, self.cmd_opts)
 
 def run(self, options: Values, args: List[str]) -> int:
 if not args:
 raise CommandError("Missing required argument (search query).")
 query = args
 pypi_hits = self.search(query, options)
 hits = transform_hits(pypi_hits)
 
 terminal_width = None
 if sys.stdout.isatty():
 terminal_width = shutil.get_terminal_size()[0]
 
 print_results(hits, terminal_width=terminal_width)
 if pypi_hits:
 return SUCCESS
 return NO_MATCHES_FOUND
 
 def search(self, query: List[str], options: Values) -> List[Dict[str, str]]:
 index_url = options.index
 
 session = self.get_default_session(options)
 
 transport = PipXmlrpcTransport(index_url, session)
 pypi = xmlrpc.client.ServerProxy(index_url, transport)
 try:
 hits = pypi.search({"name": query, "summary": query}, "or")
 except xmlrpc.client.Fault as fault:
 message = "XMLRPC request failed [code: {code}]\n{string}".format(
 code=fault.faultCode,
 string=fault.faultString,
 )
 raise CommandError(message)
 assert isinstance(hits, list)
 return hits
 
 
 def transform_hits(hits: List[Dict[str, str]]) -> List["TransformedHit"]:
 """
 The list from pypi is really a list of versions. We want a list of
 packages with the list of versions stored inline. This converts the
 list from pypi into one we can use.
 """
 packages: Dict[str, "TransformedHit"] = OrderedDict()
 for hit in hits:
 name = hit["name"]
 summary = hit["summary"]
 version = hit["version"]
 
 if name not in packages.keys():
 packages[name] = {
 "name": name,
 "summary": summary,
 "versions": [version],
 }
 else:
 packages[name]["versions"].append(version)
 
 # if this is the highest version, replace summary and score
 if version == highest_version(packages[name]["versions"]):
 packages[name]["summary"] = summary
 
 return list(packages.values())
 
 
 def print_dist_installation_info(name: str, latest: str) -> None:
 env = get_default_environment()
 dist = env.get_distribution(name)
 if dist is not None:
 with indent_log():
 if dist.version == latest:
 write_output("INSTALLED: %s (latest)", dist.version)
 else:
 write_output("INSTALLED: %s", dist.version)
 if parse_version(latest).pre:
 write_output(
 "LATEST:    %s (pre-release; install"
 " with `pip install --pre`)",
 latest,
 )
 else:
 write_output("LATEST:    %s", latest)
 
 
 def print_results(
 hits: List["TransformedHit"],
 name_column_width: Optional[int] = None,
 terminal_width: Optional[int] = None,
 ) -> None:
 if not hits:
 return
 if name_column_width is None:
 name_column_width = (
 max(
 [
 len(hit["name"]) + len(highest_version(hit.get("versions", ["-"])))
 for hit in hits
 ]
 )
 + 4
 )
 
 for hit in hits:
 name = hit["name"]
 summary = hit["summary"] or ""
 latest = highest_version(hit.get("versions", ["-"]))
 if terminal_width is not None:
 target_width = terminal_width - name_column_width - 5
 if target_width > 10:
 # wrap and indent summary to fit terminal
 summary_lines = textwrap.wrap(summary, target_width)
 summary = ("\n" + " " * (name_column_width + 3)).join(summary_lines)
 
 name_latest = f"{name} ({latest})"
 line = f"{name_latest:{name_column_width}} - {summary}"
 try:
 write_output(line)
 print_dist_installation_info(name, latest)
 except UnicodeEncodeError:
 pass
 
 
 def highest_version(versions: List[str]) -> str:
 return max(versions, key=parse_version)
 
 |