| Viewing file:  archive_util.py (7.17 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
"""Utilities for extracting common archive formats"""
 import zipfile
 import tarfile
 import os
 import shutil
 import posixpath
 import contextlib
 from distutils.errors import DistutilsError
 
 from ._path import ensure_directory
 
 __all__ = [
 "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter",
 "UnrecognizedFormat", "extraction_drivers", "unpack_directory",
 ]
 
 
 class UnrecognizedFormat(DistutilsError):
 """Couldn't recognize the archive type"""
 
 
 def default_filter(src, dst):
 """The default progress/filter callback; returns True for all files"""
 return dst
 
 
 def unpack_archive(
 filename, extract_dir, progress_filter=default_filter,
 drivers=None):
 """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat``
 
 `progress_filter` is a function taking two arguments: a source path
 internal to the archive ('/'-separated), and a filesystem path where it
 will be extracted.  The callback must return the desired extract path
 (which may be the same as the one passed in), or else ``None`` to skip
 that file or directory.  The callback can thus be used to report on the
 progress of the extraction, as well as to filter the items extracted or
 alter their extraction paths.
 
 `drivers`, if supplied, must be a non-empty sequence of functions with the
 same signature as this function (minus the `drivers` argument), that raise
 ``UnrecognizedFormat`` if they do not support extracting the designated
 archive type.  The `drivers` are tried in sequence until one is found that
 does not raise an error, or until all are exhausted (in which case
 ``UnrecognizedFormat`` is raised).  If you do not supply a sequence of
 drivers, the module's ``extraction_drivers`` constant will be used, which
 means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that
 order.
 """
 for driver in drivers or extraction_drivers:
 try:
 driver(filename, extract_dir, progress_filter)
 except UnrecognizedFormat:
 continue
 else:
 return
 else:
 raise UnrecognizedFormat(
 "Not a recognized archive type: %s" % filename
 )
 
 
 def unpack_directory(filename, extract_dir, progress_filter=default_filter):
 """"Unpack" a directory, using the same interface as for archives
 
 Raises ``UnrecognizedFormat`` if `filename` is not a directory
 """
 if not os.path.isdir(filename):
 raise UnrecognizedFormat("%s is not a directory" % filename)
 
 paths = {
 filename: ('', extract_dir),
 }
 for base, dirs, files in os.walk(filename):
 src, dst = paths[base]
 for d in dirs:
 paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
 for f in files:
 target = os.path.join(dst, f)
 target = progress_filter(src + f, target)
 if not target:
 # skip non-files
 continue
 ensure_directory(target)
 f = os.path.join(base, f)
 shutil.copyfile(f, target)
 shutil.copystat(f, target)
 
 
 def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
 """Unpack zip `filename` to `extract_dir`
 
 Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
 by ``zipfile.is_zipfile()``).  See ``unpack_archive()`` for an explanation
 of the `progress_filter` argument.
 """
 
 if not zipfile.is_zipfile(filename):
 raise UnrecognizedFormat("%s is not a zip file" % (filename,))
 
 with zipfile.ZipFile(filename) as z:
 _unpack_zipfile_obj(z, extract_dir, progress_filter)
 
 
 def _unpack_zipfile_obj(zipfile_obj, extract_dir, progress_filter=default_filter):
 """Internal/private API used by other parts of setuptools.
 Similar to ``unpack_zipfile``, but receives an already opened :obj:`zipfile.ZipFile`
 object instead of a filename.
 """
 for info in zipfile_obj.infolist():
 name = info.filename
 
 # don't extract absolute paths or ones with .. in them
 if name.startswith('/') or '..' in name.split('/'):
 continue
 
 target = os.path.join(extract_dir, *name.split('/'))
 target = progress_filter(name, target)
 if not target:
 continue
 if name.endswith('/'):
 # directory
 ensure_directory(target)
 else:
 # file
 ensure_directory(target)
 data = zipfile_obj.read(info.filename)
 with open(target, 'wb') as f:
 f.write(data)
 unix_attributes = info.external_attr >> 16
 if unix_attributes:
 os.chmod(target, unix_attributes)
 
 
 def _resolve_tar_file_or_dir(tar_obj, tar_member_obj):
 """Resolve any links and extract link targets as normal files."""
 while tar_member_obj is not None and (
 tar_member_obj.islnk() or tar_member_obj.issym()):
 linkpath = tar_member_obj.linkname
 if tar_member_obj.issym():
 base = posixpath.dirname(tar_member_obj.name)
 linkpath = posixpath.join(base, linkpath)
 linkpath = posixpath.normpath(linkpath)
 tar_member_obj = tar_obj._getmember(linkpath)
 
 is_file_or_dir = (
 tar_member_obj is not None and
 (tar_member_obj.isfile() or tar_member_obj.isdir())
 )
 if is_file_or_dir:
 return tar_member_obj
 
 raise LookupError('Got unknown file type')
 
 
 def _iter_open_tar(tar_obj, extract_dir, progress_filter):
 """Emit member-destination pairs from a tar archive."""
 # don't do any chowning!
 tar_obj.chown = lambda *args: None
 
 with contextlib.closing(tar_obj):
 for member in tar_obj:
 name = member.name
 # don't extract absolute paths or ones with .. in them
 if name.startswith('/') or '..' in name.split('/'):
 continue
 
 prelim_dst = os.path.join(extract_dir, *name.split('/'))
 
 try:
 member = _resolve_tar_file_or_dir(tar_obj, member)
 except LookupError:
 continue
 
 final_dst = progress_filter(name, prelim_dst)
 if not final_dst:
 continue
 
 if final_dst.endswith(os.sep):
 final_dst = final_dst[:-1]
 
 yield member, final_dst
 
 
 def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
 """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
 
 Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
 by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
 of the `progress_filter` argument.
 """
 try:
 tarobj = tarfile.open(filename)
 except tarfile.TarError as e:
 raise UnrecognizedFormat(
 "%s is not a compressed or uncompressed tar file" % (filename,)
 ) from e
 
 for member, final_dst in _iter_open_tar(
 tarobj, extract_dir, progress_filter,
 ):
 try:
 # XXX Ugh
 tarobj._extract_member(member, final_dst)
 except tarfile.ExtractError:
 # chown/chmod/mkfifo/mknode/makedev failed
 pass
 
 return True
 
 
 extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile
 
 |