| Viewing file:  cache.py (4.51 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
"""HTTP cache implementation."""
 
 import os
 from contextlib import contextmanager
 from datetime import datetime
 from typing import BinaryIO, Generator, Optional, Union
 
 from pip._vendor.cachecontrol.cache import SeparateBodyBaseCache
 from pip._vendor.cachecontrol.caches import SeparateBodyFileCache
 from pip._vendor.requests.models import Response
 
 from pip._internal.utils.filesystem import adjacent_tmp_file, replace
 from pip._internal.utils.misc import ensure_dir
 
 
 def is_from_cache(response: Response) -> bool:
 return getattr(response, "from_cache", False)
 
 
 @contextmanager
 def suppressed_cache_errors() -> Generator[None, None, None]:
 """If we can't access the cache then we can just skip caching and process
 requests as if caching wasn't enabled.
 """
 try:
 yield
 except OSError:
 pass
 
 
 class SafeFileCache(SeparateBodyBaseCache):
 """
 A file based cache which is safe to use even when the target directory may
 not be accessible or writable.
 
 There is a race condition when two processes try to write and/or read the
 same entry at the same time, since each entry consists of two separate
 files (https://github.com/psf/cachecontrol/issues/324).  We therefore have
 additional logic that makes sure that both files to be present before
 returning an entry; this fixes the read side of the race condition.
 
 For the write side, we assume that the server will only ever return the
 same data for the same URL, which ought to be the case for files pip is
 downloading.  PyPI does not have a mechanism to swap out a wheel for
 another wheel, for example.  If this assumption is not true, the
 CacheControl issue will need to be fixed.
 """
 
 def __init__(self, directory: str) -> None:
 assert directory is not None, "Cache directory must not be None."
 super().__init__()
 self.directory = directory
 
 def _get_cache_path(self, name: str) -> str:
 # From cachecontrol.caches.file_cache.FileCache._fn, brought into our
 # class for backwards-compatibility and to avoid using a non-public
 # method.
 hashed = SeparateBodyFileCache.encode(name)
 parts = list(hashed[:5]) + [hashed]
 return os.path.join(self.directory, *parts)
 
 def get(self, key: str) -> Optional[bytes]:
 # The cache entry is only valid if both metadata and body exist.
 metadata_path = self._get_cache_path(key)
 body_path = metadata_path + ".body"
 if not (os.path.exists(metadata_path) and os.path.exists(body_path)):
 return None
 with suppressed_cache_errors():
 with open(metadata_path, "rb") as f:
 return f.read()
 
 def _write(self, path: str, data: bytes) -> None:
 with suppressed_cache_errors():
 ensure_dir(os.path.dirname(path))
 
 with adjacent_tmp_file(path) as f:
 f.write(data)
 # Inherit the read/write permissions of the cache directory
 # to enable multi-user cache use-cases.
 mode = (
 os.stat(self.directory).st_mode
 & 0o666  # select read/write permissions of cache directory
 | 0o600  # set owner read/write permissions
 )
 # Change permissions only if there is no risk of following a symlink.
 if os.chmod in os.supports_fd:
 os.chmod(f.fileno(), mode)
 elif os.chmod in os.supports_follow_symlinks:
 os.chmod(f.name, mode, follow_symlinks=False)
 
 replace(f.name, path)
 
 def set(
 self, key: str, value: bytes, expires: Union[int, datetime, None] = None
 ) -> None:
 path = self._get_cache_path(key)
 self._write(path, value)
 
 def delete(self, key: str) -> None:
 path = self._get_cache_path(key)
 with suppressed_cache_errors():
 os.remove(path)
 with suppressed_cache_errors():
 os.remove(path + ".body")
 
 def get_body(self, key: str) -> Optional[BinaryIO]:
 # The cache entry is only valid if both metadata and body exist.
 metadata_path = self._get_cache_path(key)
 body_path = metadata_path + ".body"
 if not (os.path.exists(metadata_path) and os.path.exists(body_path)):
 return None
 with suppressed_cache_errors():
 return open(body_path, "rb")
 
 def set_body(self, key: str, body: bytes) -> None:
 path = self._get_cache_path(key) + ".body"
 self._write(path, body)
 
 |