| Viewing file:  download.py (5.91 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
"""Download files with progress indicators."""
 
 import email.message
 import logging
 import mimetypes
 import os
 from typing import Iterable, Optional, Tuple
 
 from pip._vendor.requests.models import Response
 
 from pip._internal.cli.progress_bars import get_download_progress_renderer
 from pip._internal.exceptions import NetworkConnectionError
 from pip._internal.models.index import PyPI
 from pip._internal.models.link import Link
 from pip._internal.network.cache import is_from_cache
 from pip._internal.network.session import PipSession
 from pip._internal.network.utils import HEADERS, raise_for_status, response_chunks
 from pip._internal.utils.misc import format_size, redact_auth_from_url, splitext
 
 logger = logging.getLogger(__name__)
 
 
 def _get_http_response_size(resp: Response) -> Optional[int]:
 try:
 return int(resp.headers["content-length"])
 except (ValueError, KeyError, TypeError):
 return None
 
 
 def _prepare_download(
 resp: Response,
 link: Link,
 progress_bar: str,
 ) -> Iterable[bytes]:
 total_length = _get_http_response_size(resp)
 
 if link.netloc == PyPI.file_storage_domain:
 url = link.show_url
 else:
 url = link.url_without_fragment
 
 logged_url = redact_auth_from_url(url)
 
 if total_length:
 logged_url = f"{logged_url} ({format_size(total_length)})"
 
 if is_from_cache(resp):
 logger.info("Using cached %s", logged_url)
 else:
 logger.info("Downloading %s", logged_url)
 
 if logger.getEffectiveLevel() > logging.INFO:
 show_progress = False
 elif is_from_cache(resp):
 show_progress = False
 elif not total_length:
 show_progress = True
 elif total_length > (512 * 1024):
 show_progress = True
 else:
 show_progress = False
 
 chunks = response_chunks(resp)
 
 if not show_progress:
 return chunks
 
 renderer = get_download_progress_renderer(bar_type=progress_bar, size=total_length)
 return renderer(chunks)
 
 
 def sanitize_content_filename(filename: str) -> str:
 """
 Sanitize the "filename" value from a Content-Disposition header.
 """
 return os.path.basename(filename)
 
 
 def parse_content_disposition(content_disposition: str, default_filename: str) -> str:
 """
 Parse the "filename" value from a Content-Disposition header, and
 return the default filename if the result is empty.
 """
 m = email.message.Message()
 m["content-type"] = content_disposition
 filename = m.get_param("filename")
 if filename:
 # We need to sanitize the filename to prevent directory traversal
 # in case the filename contains ".." path parts.
 filename = sanitize_content_filename(str(filename))
 return filename or default_filename
 
 
 def _get_http_response_filename(resp: Response, link: Link) -> str:
 """Get an ideal filename from the given HTTP response, falling back to
 the link filename if not provided.
 """
 filename = link.filename  # fallback
 # Have a look at the Content-Disposition header for a better guess
 content_disposition = resp.headers.get("content-disposition")
 if content_disposition:
 filename = parse_content_disposition(content_disposition, filename)
 ext: Optional[str] = splitext(filename)[1]
 if not ext:
 ext = mimetypes.guess_extension(resp.headers.get("content-type", ""))
 if ext:
 filename += ext
 if not ext and link.url != resp.url:
 ext = os.path.splitext(resp.url)[1]
 if ext:
 filename += ext
 return filename
 
 
 def _http_get_download(session: PipSession, link: Link) -> Response:
 target_url = link.url.split("#", 1)[0]
 resp = session.get(target_url, headers=HEADERS, stream=True)
 raise_for_status(resp)
 return resp
 
 
 class Downloader:
 def __init__(
 self,
 session: PipSession,
 progress_bar: str,
 ) -> None:
 self._session = session
 self._progress_bar = progress_bar
 
 def __call__(self, link: Link, location: str) -> Tuple[str, str]:
 """Download the file given by link into location."""
 try:
 resp = _http_get_download(self._session, link)
 except NetworkConnectionError as e:
 assert e.response is not None
 logger.critical(
 "HTTP error %s while getting %s", e.response.status_code, link
 )
 raise
 
 filename = _get_http_response_filename(resp, link)
 filepath = os.path.join(location, filename)
 
 chunks = _prepare_download(resp, link, self._progress_bar)
 with open(filepath, "wb") as content_file:
 for chunk in chunks:
 content_file.write(chunk)
 content_type = resp.headers.get("Content-Type", "")
 return filepath, content_type
 
 
 class BatchDownloader:
 def __init__(
 self,
 session: PipSession,
 progress_bar: str,
 ) -> None:
 self._session = session
 self._progress_bar = progress_bar
 
 def __call__(
 self, links: Iterable[Link], location: str
 ) -> Iterable[Tuple[Link, Tuple[str, str]]]:
 """Download the files given by links into location."""
 for link in links:
 try:
 resp = _http_get_download(self._session, link)
 except NetworkConnectionError as e:
 assert e.response is not None
 logger.critical(
 "HTTP error %s while getting %s",
 e.response.status_code,
 link,
 )
 raise
 
 filename = _get_http_response_filename(resp, link)
 filepath = os.path.join(location, filename)
 
 chunks = _prepare_download(resp, link, self._progress_bar)
 with open(filepath, "wb") as content_file:
 for chunk in chunks:
 content_file.write(chunk)
 content_type = resp.headers.get("Content-Type", "")
 yield link, (filepath, content_type)
 
 |