| Viewing file:  direct_url.py (6.74 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
""" PEP 610 """import json
 import re
 
 from pip._vendor import six
 from pip._vendor.six.moves.urllib import parse as urllib_parse
 
 from pip._internal.utils.typing import MYPY_CHECK_RUNNING
 
 if MYPY_CHECK_RUNNING:
 from typing import (
 Any, Dict, Iterable, Optional, Type, TypeVar, Union
 )
 
 T = TypeVar("T")
 
 
 DIRECT_URL_METADATA_NAME = "direct_url.json"
 ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$")
 
 __all__ = [
 "DirectUrl",
 "DirectUrlValidationError",
 "DirInfo",
 "ArchiveInfo",
 "VcsInfo",
 ]
 
 
 class DirectUrlValidationError(Exception):
 pass
 
 
 def _get(d, expected_type, key, default=None):
 # type: (Dict[str, Any], Type[T], str, Optional[T]) -> Optional[T]
 """Get value from dictionary and verify expected type."""
 if key not in d:
 return default
 value = d[key]
 if six.PY2 and expected_type is str:
 expected_type = six.string_types  # type: ignore
 if not isinstance(value, expected_type):
 raise DirectUrlValidationError(
 "{!r} has unexpected type for {} (expected {})".format(
 value, key, expected_type
 )
 )
 return value
 
 
 def _get_required(d, expected_type, key, default=None):
 # type: (Dict[str, Any], Type[T], str, Optional[T]) -> T
 value = _get(d, expected_type, key, default)
 if value is None:
 raise DirectUrlValidationError("{} must have a value".format(key))
 return value
 
 
 def _exactly_one_of(infos):
 # type: (Iterable[Optional[InfoType]]) -> InfoType
 infos = [info for info in infos if info is not None]
 if not infos:
 raise DirectUrlValidationError(
 "missing one of archive_info, dir_info, vcs_info"
 )
 if len(infos) > 1:
 raise DirectUrlValidationError(
 "more than one of archive_info, dir_info, vcs_info"
 )
 assert infos[0] is not None
 return infos[0]
 
 
 def _filter_none(**kwargs):
 # type: (Any) -> Dict[str, Any]
 """Make dict excluding None values."""
 return {k: v for k, v in kwargs.items() if v is not None}
 
 
 class VcsInfo(object):
 name = "vcs_info"
 
 def __init__(
 self,
 vcs,  # type: str
 commit_id,  # type: str
 requested_revision=None,  # type: Optional[str]
 resolved_revision=None,  # type: Optional[str]
 resolved_revision_type=None,  # type: Optional[str]
 ):
 self.vcs = vcs
 self.requested_revision = requested_revision
 self.commit_id = commit_id
 self.resolved_revision = resolved_revision
 self.resolved_revision_type = resolved_revision_type
 
 @classmethod
 def _from_dict(cls, d):
 # type: (Optional[Dict[str, Any]]) -> Optional[VcsInfo]
 if d is None:
 return None
 return cls(
 vcs=_get_required(d, str, "vcs"),
 commit_id=_get_required(d, str, "commit_id"),
 requested_revision=_get(d, str, "requested_revision"),
 resolved_revision=_get(d, str, "resolved_revision"),
 resolved_revision_type=_get(d, str, "resolved_revision_type"),
 )
 
 def _to_dict(self):
 # type: () -> Dict[str, Any]
 return _filter_none(
 vcs=self.vcs,
 requested_revision=self.requested_revision,
 commit_id=self.commit_id,
 resolved_revision=self.resolved_revision,
 resolved_revision_type=self.resolved_revision_type,
 )
 
 
 class ArchiveInfo(object):
 name = "archive_info"
 
 def __init__(
 self,
 hash=None,  # type: Optional[str]
 ):
 self.hash = hash
 
 @classmethod
 def _from_dict(cls, d):
 # type: (Optional[Dict[str, Any]]) -> Optional[ArchiveInfo]
 if d is None:
 return None
 return cls(hash=_get(d, str, "hash"))
 
 def _to_dict(self):
 # type: () -> Dict[str, Any]
 return _filter_none(hash=self.hash)
 
 
 class DirInfo(object):
 name = "dir_info"
 
 def __init__(
 self,
 editable=False,  # type: bool
 ):
 self.editable = editable
 
 @classmethod
 def _from_dict(cls, d):
 # type: (Optional[Dict[str, Any]]) -> Optional[DirInfo]
 if d is None:
 return None
 return cls(
 editable=_get_required(d, bool, "editable", default=False)
 )
 
 def _to_dict(self):
 # type: () -> Dict[str, Any]
 return _filter_none(editable=self.editable or None)
 
 
 if MYPY_CHECK_RUNNING:
 InfoType = Union[ArchiveInfo, DirInfo, VcsInfo]
 
 
 class DirectUrl(object):
 
 def __init__(
 self,
 url,  # type: str
 info,  # type: InfoType
 subdirectory=None,  # type: Optional[str]
 ):
 self.url = url
 self.info = info
 self.subdirectory = subdirectory
 
 def _remove_auth_from_netloc(self, netloc):
 # type: (str) -> str
 if "@" not in netloc:
 return netloc
 user_pass, netloc_no_user_pass = netloc.split("@", 1)
 if (
 isinstance(self.info, VcsInfo) and
 self.info.vcs == "git" and
 user_pass == "git"
 ):
 return netloc
 if ENV_VAR_RE.match(user_pass):
 return netloc
 return netloc_no_user_pass
 
 @property
 def redacted_url(self):
 # type: () -> str
 """url with user:password part removed unless it is formed with
 environment variables as specified in PEP 610, or it is ``git``
 in the case of a git URL.
 """
 purl = urllib_parse.urlsplit(self.url)
 netloc = self._remove_auth_from_netloc(purl.netloc)
 surl = urllib_parse.urlunsplit(
 (purl.scheme, netloc, purl.path, purl.query, purl.fragment)
 )
 return surl
 
 def validate(self):
 # type: () -> None
 self.from_dict(self.to_dict())
 
 @classmethod
 def from_dict(cls, d):
 # type: (Dict[str, Any]) -> DirectUrl
 return DirectUrl(
 url=_get_required(d, str, "url"),
 subdirectory=_get(d, str, "subdirectory"),
 info=_exactly_one_of(
 [
 ArchiveInfo._from_dict(_get(d, dict, "archive_info")),
 DirInfo._from_dict(_get(d, dict, "dir_info")),
 VcsInfo._from_dict(_get(d, dict, "vcs_info")),
 ]
 ),
 )
 
 def to_dict(self):
 # type: () -> Dict[str, Any]
 res = _filter_none(
 url=self.redacted_url,
 subdirectory=self.subdirectory,
 )
 res[self.info.name] = self.info._to_dict()
 return res
 
 @classmethod
 def from_json(cls, s):
 # type: (str) -> DirectUrl
 return cls.from_dict(json.loads(s))
 
 def to_json(self):
 # type: () -> str
 return json.dumps(self.to_dict(), sort_keys=True)
 
 |