| Viewing file:  context.py (5.29 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
import osimport subprocess
 import contextlib
 import functools
 import tempfile
 import shutil
 import operator
 
 
 @contextlib.contextmanager
 def pushd(dir):
 orig = os.getcwd()
 os.chdir(dir)
 try:
 yield dir
 finally:
 os.chdir(orig)
 
 
 @contextlib.contextmanager
 def tarball_context(url, target_dir=None, runner=None, pushd=pushd):
 """
 Get a tarball, extract it, change to that directory, yield, then
 clean up.
 `runner` is the function to invoke commands.
 `pushd` is a context manager for changing the directory.
 """
 if target_dir is None:
 target_dir = os.path.basename(url).replace('.tar.gz', '').replace('.tgz', '')
 if runner is None:
 runner = functools.partial(subprocess.check_call, shell=True)
 # In the tar command, use --strip-components=1 to strip the first path and
 #  then
 #  use -C to cause the files to be extracted to {target_dir}. This ensures
 #  that we always know where the files were extracted.
 runner('mkdir {target_dir}'.format(**vars()))
 try:
 getter = 'wget {url} -O -'
 extract = 'tar x{compression} --strip-components=1 -C {target_dir}'
 cmd = ' | '.join((getter, extract))
 runner(cmd.format(compression=infer_compression(url), **vars()))
 with pushd(target_dir):
 yield target_dir
 finally:
 runner('rm -Rf {target_dir}'.format(**vars()))
 
 
 def infer_compression(url):
 """
 Given a URL or filename, infer the compression code for tar.
 """
 # cheat and just assume it's the last two characters
 compression_indicator = url[-2:]
 mapping = dict(gz='z', bz='j', xz='J')
 # Assume 'z' (gzip) if no match
 return mapping.get(compression_indicator, 'z')
 
 
 @contextlib.contextmanager
 def temp_dir(remover=shutil.rmtree):
 """
 Create a temporary directory context. Pass a custom remover
 to override the removal behavior.
 """
 temp_dir = tempfile.mkdtemp()
 try:
 yield temp_dir
 finally:
 remover(temp_dir)
 
 
 @contextlib.contextmanager
 def repo_context(url, branch=None, quiet=True, dest_ctx=temp_dir):
 """
 Check out the repo indicated by url.
 
 If dest_ctx is supplied, it should be a context manager
 to yield the target directory for the check out.
 """
 exe = 'git' if 'git' in url else 'hg'
 with dest_ctx() as repo_dir:
 cmd = [exe, 'clone', url, repo_dir]
 if branch:
 cmd.extend(['--branch', branch])
 devnull = open(os.path.devnull, 'w')
 stdout = devnull if quiet else None
 subprocess.check_call(cmd, stdout=stdout)
 yield repo_dir
 
 
 @contextlib.contextmanager
 def null():
 yield
 
 
 class ExceptionTrap:
 """
 A context manager that will catch certain exceptions and provide an
 indication they occurred.
 
 >>> with ExceptionTrap() as trap:
 ...     raise Exception()
 >>> bool(trap)
 True
 
 >>> with ExceptionTrap() as trap:
 ...     pass
 >>> bool(trap)
 False
 
 >>> with ExceptionTrap(ValueError) as trap:
 ...     raise ValueError("1 + 1 is not 3")
 >>> bool(trap)
 True
 
 >>> with ExceptionTrap(ValueError) as trap:
 ...     raise Exception()
 Traceback (most recent call last):
 ...
 Exception
 
 >>> bool(trap)
 False
 """
 
 exc_info = None, None, None
 
 def __init__(self, exceptions=(Exception,)):
 self.exceptions = exceptions
 
 def __enter__(self):
 return self
 
 @property
 def type(self):
 return self.exc_info[0]
 
 @property
 def value(self):
 return self.exc_info[1]
 
 @property
 def tb(self):
 return self.exc_info[2]
 
 def __exit__(self, *exc_info):
 type = exc_info[0]
 matches = type and issubclass(type, self.exceptions)
 if matches:
 self.exc_info = exc_info
 return matches
 
 def __bool__(self):
 return bool(self.type)
 
 def raises(self, func, *, _test=bool):
 """
 Wrap func and replace the result with the truth
 value of the trap (True if an exception occurred).
 
 First, give the decorator an alias to support Python 3.8
 Syntax.
 
 >>> raises = ExceptionTrap(ValueError).raises
 
 Now decorate a function that always fails.
 
 >>> @raises
 ... def fail():
 ...     raise ValueError('failed')
 >>> fail()
 True
 """
 
 @functools.wraps(func)
 def wrapper(*args, **kwargs):
 with ExceptionTrap(self.exceptions) as trap:
 func(*args, **kwargs)
 return _test(trap)
 
 return wrapper
 
 def passes(self, func):
 """
 Wrap func and replace the result with the truth
 value of the trap (True if no exception).
 
 First, give the decorator an alias to support Python 3.8
 Syntax.
 
 >>> passes = ExceptionTrap(ValueError).passes
 
 Now decorate a function that always fails.
 
 >>> @passes
 ... def fail():
 ...     raise ValueError('failed')
 
 >>> fail()
 False
 """
 return self.raises(func, _test=operator.not_)
 
 
 class suppress(contextlib.suppress, contextlib.ContextDecorator):
 """
 A version of contextlib.suppress with decorator support.
 
 >>> @suppress(KeyError)
 ... def key_error():
 ...     {}['']
 >>> key_error()
 """
 
 |