| Viewing file:  _adapters.py (4.4 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
from contextlib import suppressfrom io import TextIOWrapper
 
 from . import abc
 
 
 class SpecLoaderAdapter:
 """
 Adapt a package spec to adapt the underlying loader.
 """
 
 def __init__(self, spec, adapter=lambda spec: spec.loader):
 self.spec = spec
 self.loader = adapter(spec)
 
 def __getattr__(self, name):
 return getattr(self.spec, name)
 
 
 class TraversableResourcesLoader:
 """
 Adapt a loader to provide TraversableResources.
 """
 
 def __init__(self, spec):
 self.spec = spec
 
 def get_resource_reader(self, name):
 return CompatibilityFiles(self.spec)._native()
 
 
 def _io_wrapper(file, mode='r', *args, **kwargs):
 if mode == 'r':
 return TextIOWrapper(file, *args, **kwargs)
 elif mode == 'rb':
 return file
 raise ValueError(
 "Invalid mode value '{}', only 'r' and 'rb' are supported".format(mode)
 )
 
 
 class CompatibilityFiles:
 """
 Adapter for an existing or non-existent resource reader
 to provide a compatibility .files().
 """
 
 class SpecPath(abc.Traversable):
 """
 Path tied to a module spec.
 Can be read and exposes the resource reader children.
 """
 
 def __init__(self, spec, reader):
 self._spec = spec
 self._reader = reader
 
 def iterdir(self):
 if not self._reader:
 return iter(())
 return iter(
 CompatibilityFiles.ChildPath(self._reader, path)
 for path in self._reader.contents()
 )
 
 def is_file(self):
 return False
 
 is_dir = is_file
 
 def joinpath(self, other):
 if not self._reader:
 return CompatibilityFiles.OrphanPath(other)
 return CompatibilityFiles.ChildPath(self._reader, other)
 
 @property
 def name(self):
 return self._spec.name
 
 def open(self, mode='r', *args, **kwargs):
 return _io_wrapper(self._reader.open_resource(None), mode, *args, **kwargs)
 
 class ChildPath(abc.Traversable):
 """
 Path tied to a resource reader child.
 Can be read but doesn't expose any meaningful children.
 """
 
 def __init__(self, reader, name):
 self._reader = reader
 self._name = name
 
 def iterdir(self):
 return iter(())
 
 def is_file(self):
 return self._reader.is_resource(self.name)
 
 def is_dir(self):
 return not self.is_file()
 
 def joinpath(self, other):
 return CompatibilityFiles.OrphanPath(self.name, other)
 
 @property
 def name(self):
 return self._name
 
 def open(self, mode='r', *args, **kwargs):
 return _io_wrapper(
 self._reader.open_resource(self.name), mode, *args, **kwargs
 )
 
 class OrphanPath(abc.Traversable):
 """
 Orphan path, not tied to a module spec or resource reader.
 Can't be read and doesn't expose any meaningful children.
 """
 
 def __init__(self, *path_parts):
 if len(path_parts) < 1:
 raise ValueError('Need at least one path part to construct a path')
 self._path = path_parts
 
 def iterdir(self):
 return iter(())
 
 def is_file(self):
 return False
 
 is_dir = is_file
 
 def joinpath(self, other):
 return CompatibilityFiles.OrphanPath(*self._path, other)
 
 @property
 def name(self):
 return self._path[-1]
 
 def open(self, mode='r', *args, **kwargs):
 raise FileNotFoundError("Can't open orphan path")
 
 def __init__(self, spec):
 self.spec = spec
 
 @property
 def _reader(self):
 with suppress(AttributeError):
 return self.spec.loader.get_resource_reader(self.spec.name)
 
 def _native(self):
 """
 Return the native reader if it supports files().
 """
 reader = self._reader
 return reader if hasattr(reader, 'files') else self
 
 def __getattr__(self, attr):
 return getattr(self._reader, attr)
 
 def files(self):
 return CompatibilityFiles.SpecPath(self.spec, self._reader)
 
 
 def wrap_spec(package):
 """
 Construct a package spec with traversable compatibility
 on the spec/loader/reader.
 """
 return SpecLoaderAdapter(package.__spec__, TraversableResourcesLoader)
 
 |