| Viewing file:  runner.py (10.94 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
#!/usr/bin/env python3
 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
 """
 Unit test runner, providing new features on top of unittest module:
 - colourized output
 - parallel run (UNIX only)
 - print failures/tracebacks on CTRL+C
 - re-run failed tests only (make test-failed)
 
 Invocation examples:
 - make test
 - make test-failed
 
 Parallel:
 - make test-parallel
 - make test-process ARGS=--parallel
 """
 
 from __future__ import print_function
 
 import atexit
 import optparse
 import os
 import sys
 import textwrap
 import time
 import unittest
 
 
 try:
 import ctypes
 except ImportError:
 ctypes = None
 
 try:
 import concurrencytest  # pip install concurrencytest
 except ImportError:
 concurrencytest = None
 
 import psutil
 from psutil._common import hilite
 from psutil._common import print_color
 from psutil._common import term_supports_colors
 from psutil._compat import super
 from psutil.tests import CI_TESTING
 from psutil.tests import import_module_by_path
 from psutil.tests import print_sysinfo
 from psutil.tests import reap_children
 from psutil.tests import safe_rmpath
 
 
 VERBOSITY = 2
 FAILED_TESTS_FNAME = '.failed-tests.txt'
 NWORKERS = psutil.cpu_count() or 1
 USE_COLORS = not CI_TESTING and term_supports_colors()
 
 HERE = os.path.abspath(os.path.dirname(__file__))
 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
 
 
 def cprint(msg, color, bold=False, file=None):
 if file is None:
 file = sys.stderr if color == 'red' else sys.stdout
 if USE_COLORS:
 print_color(msg, color, bold=bold, file=file)
 else:
 print(msg, file=file)
 
 
 class TestLoader:
 
 testdir = HERE
 skip_files = ['test_memleaks.py']
 if "WHEELHOUSE_UPLOADER_USERNAME" in os.environ:
 skip_files.extend(['test_osx.py', 'test_linux.py', 'test_posix.py'])
 
 def _get_testmods(self):
 return [os.path.join(self.testdir, x)
 for x in os.listdir(self.testdir)
 if x.startswith('test_') and x.endswith('.py') and
 x not in self.skip_files]
 
 def _iter_testmod_classes(self):
 """Iterate over all test files in this directory and return
 all TestCase classes in them.
 """
 for path in self._get_testmods():
 mod = import_module_by_path(path)
 for name in dir(mod):
 obj = getattr(mod, name)
 if isinstance(obj, type) and \
 issubclass(obj, unittest.TestCase):
 yield obj
 
 def all(self):
 suite = unittest.TestSuite()
 for obj in self._iter_testmod_classes():
 test = loadTestsFromTestCase(obj)
 suite.addTest(test)
 return suite
 
 def last_failed(self):
 # ...from previously failed test run
 suite = unittest.TestSuite()
 if not os.path.isfile(FAILED_TESTS_FNAME):
 return suite
 with open(FAILED_TESTS_FNAME, 'rt') as f:
 names = f.read().split()
 for n in names:
 test = unittest.defaultTestLoader.loadTestsFromName(n)
 suite.addTest(test)
 return suite
 
 def from_name(self, name):
 if name.endswith('.py'):
 name = os.path.splitext(os.path.basename(name))[0]
 return unittest.defaultTestLoader.loadTestsFromName(name)
 
 
 class ColouredResult(unittest.TextTestResult):
 
 def addSuccess(self, test):
 unittest.TestResult.addSuccess(self, test)
 cprint("OK", "green")
 
 def addError(self, test, err):
 unittest.TestResult.addError(self, test, err)
 cprint("ERROR", "red", bold=True)
 
 def addFailure(self, test, err):
 unittest.TestResult.addFailure(self, test, err)
 cprint("FAIL", "red")
 
 def addSkip(self, test, reason):
 unittest.TestResult.addSkip(self, test, reason)
 cprint("skipped: %s" % reason.strip(), "brown")
 
 def printErrorList(self, flavour, errors):
 flavour = hilite(flavour, "red", bold=flavour == 'ERROR')
 super().printErrorList(flavour, errors)
 
 
 class ColouredTextRunner(unittest.TextTestRunner):
 """
 A coloured text runner which also prints failed tests on KeyboardInterrupt
 and save failed tests in a file so that they can be re-run.
 """
 resultclass = ColouredResult if USE_COLORS else unittest.TextTestResult
 
 def __init__(self, *args, **kwargs):
 super().__init__(*args, **kwargs)
 self.failed_tnames = set()
 
 def _makeResult(self):
 # Store result instance so that it can be accessed on
 # KeyboardInterrupt.
 self.result = super()._makeResult()
 return self.result
 
 def _write_last_failed(self):
 if self.failed_tnames:
 with open(FAILED_TESTS_FNAME, 'wt') as f:
 for tname in self.failed_tnames:
 f.write(tname + '\n')
 
 def _save_result(self, result):
 if not result.wasSuccessful():
 for t in result.errors + result.failures:
 tname = t[0].id()
 self.failed_tnames.add(tname)
 
 def _run(self, suite):
 try:
 result = super().run(suite)
 except (KeyboardInterrupt, SystemExit):
 result = self.runner.result
 result.printErrors()
 raise sys.exit(1)
 else:
 self._save_result(result)
 return result
 
 def _exit(self, success):
 if success:
 cprint("SUCCESS", "green", bold=True)
 safe_rmpath(FAILED_TESTS_FNAME)
 sys.exit(0)
 else:
 cprint("FAILED", "red", bold=True)
 self._write_last_failed()
 sys.exit(1)
 
 def run(self, suite):
 result = self._run(suite)
 self._exit(result.wasSuccessful())
 
 
 class ParallelRunner(ColouredTextRunner):
 
 @staticmethod
 def _parallelize(suite):
 def fdopen(fd, mode, *kwds):
 stream = orig_fdopen(fd, mode)
 atexit.register(stream.close)
 return stream
 
 # Monkey patch concurrencytest lib bug (fdopen() stream not closed).
 # https://github.com/cgoldberg/concurrencytest/issues/11
 orig_fdopen = os.fdopen
 concurrencytest.os.fdopen = fdopen
 forker = concurrencytest.fork_for_tests(NWORKERS)
 return concurrencytest.ConcurrentTestSuite(suite, forker)
 
 @staticmethod
 def _split_suite(suite):
 serial = unittest.TestSuite()
 parallel = unittest.TestSuite()
 for test in suite:
 if test.countTestCases() == 0:
 continue
 elif isinstance(test, unittest.TestSuite):
 test_class = test._tests[0].__class__
 elif isinstance(test, unittest.TestCase):
 test_class = test
 else:
 raise TypeError("can't recognize type %r" % test)
 
 if getattr(test_class, '_serialrun', False):
 serial.addTest(test)
 else:
 parallel.addTest(test)
 return (serial, parallel)
 
 def run(self, suite):
 ser_suite, par_suite = self._split_suite(suite)
 par_suite = self._parallelize(par_suite)
 
 # run parallel
 cprint("starting parallel tests using %s workers" % NWORKERS,
 "green", bold=True)
 t = time.time()
 par = self._run(par_suite)
 par_elapsed = time.time() - t
 
 # At this point we should have N zombies (the workers), which
 # will disappear with wait().
 orphans = psutil.Process().children()
 gone, alive = psutil.wait_procs(orphans, timeout=1)
 if alive:
 cprint("alive processes %s" % alive, "red")
 reap_children()
 
 # run serial
 t = time.time()
 ser = self._run(ser_suite)
 ser_elapsed = time.time() - t
 
 # print
 if not par.wasSuccessful() and ser_suite.countTestCases() > 0:
 par.printErrors()  # print them again at the bottom
 par_fails, par_errs, par_skips = map(len, (par.failures,
 par.errors,
 par.skipped))
 ser_fails, ser_errs, ser_skips = map(len, (ser.failures,
 ser.errors,
 ser.skipped))
 print(textwrap.dedent("""
 +----------+----------+----------+----------+----------+----------+
 |          |    total | failures |   errors |  skipped |     time |
 +----------+----------+----------+----------+----------+----------+
 | parallel |      %3s |      %3s |      %3s |      %3s |    %.2fs |
 +----------+----------+----------+----------+----------+----------+
 | serial   |      %3s |      %3s |      %3s |      %3s |    %.2fs |
 +----------+----------+----------+----------+----------+----------+
 """ % (par.testsRun, par_fails, par_errs, par_skips, par_elapsed,
 ser.testsRun, ser_fails, ser_errs, ser_skips, ser_elapsed)))
 print("Ran %s tests in %.3fs using %s workers" % (
 par.testsRun + ser.testsRun, par_elapsed + ser_elapsed, NWORKERS))
 ok = par.wasSuccessful() and ser.wasSuccessful()
 self._exit(ok)
 
 
 def get_runner(parallel=False):
 def warn(msg):
 cprint(msg + " Running serial tests instead.", "red")
 if parallel:
 if psutil.WINDOWS:
 warn("Can't run parallel tests on Windows.")
 elif concurrencytest is None:
 warn("concurrencytest module is not installed.")
 elif NWORKERS == 1:
 warn("Only 1 CPU available.")
 else:
 return ParallelRunner(verbosity=VERBOSITY)
 return ColouredTextRunner(verbosity=VERBOSITY)
 
 
 # Used by test_*,py modules.
 def run_from_name(name):
 if CI_TESTING:
 print_sysinfo()
 suite = TestLoader().from_name(name)
 runner = get_runner()
 runner.run(suite)
 
 
 def setup():
 psutil._set_debug(True)
 
 
 def main():
 setup()
 usage = "python3 -m psutil.tests [opts] [test-name]"
 parser = optparse.OptionParser(usage=usage, description="run unit tests")
 parser.add_option("--last-failed",
 action="store_true", default=False,
 help="only run last failed tests")
 parser.add_option("--parallel",
 action="store_true", default=False,
 help="run tests in parallel")
 opts, args = parser.parse_args()
 
 if not opts.last_failed:
 safe_rmpath(FAILED_TESTS_FNAME)
 
 # loader
 loader = TestLoader()
 if args:
 if len(args) > 1:
 parser.print_usage()
 return sys.exit(1)
 else:
 suite = loader.from_name(args[0])
 elif opts.last_failed:
 suite = loader.last_failed()
 else:
 suite = loader.all()
 
 if CI_TESTING:
 print_sysinfo()
 runner = get_runner(opts.parallel)
 runner.run(suite)
 
 
 if __name__ == '__main__':
 main()
 
 |