From 954ccfe6b18fac54d8f071002b6a61eb9929cfdc Mon Sep 17 00:00:00 2001 From: Patrick Uiterwijk Date: Jul 30 2017 17:01:10 +0000 Subject: Rework test framework The reworked framework is able to summarize test runs, and is less verbose. Signed-off-by: Patrick Uiterwijk Reviewed-by: Howard Johnson --- diff --git a/tests/helpers/common.py b/tests/helpers/common.py index 8199ed5..e366db2 100755 --- a/tests/helpers/common.py +++ b/tests/helpers/common.py @@ -12,6 +12,8 @@ import random from string import Template import subprocess +from control import TC # pylint: disable=relative-import + WRAP_HOSTNAME = 'idp.ipsilon.dev' TESTREALM = 'IPSILON.DEV' @@ -81,6 +83,8 @@ class IpsilonTestBase(object): self.testuser = pwd.getpwuid(os.getuid())[0] self.processes = [] self.allow_wrappers = allow_wrappers + self.current_setup_step = None + self.print_cases = False self.stdout = None self.stderr = None @@ -89,9 +93,13 @@ class IpsilonTestBase(object): This is used for example with specific modules or features that are not supported on all platforms due to dependency availability. + + If the platform is supported, it returns None. + Otherwise it returns a string indicating why the platform does not + support the current test. """ # Every test defaults to being available on every platform - return True + return None def force_remove(self, op, name, info): os.chmod(name, 0700) @@ -317,7 +325,7 @@ basicConstraints = CA:false""" % {'certdir': os.path.join(self.testdir, f.write(text) subprocess.check_call(['/usr/sbin/slapadd', '-f', filename, '-l', 'tests/ldapdata.ldif'], env=env, - stdout=self.stdout, stderr=self.stderr) + stdout=self.stdout, stderr=self.stderr) return filename @@ -434,9 +442,40 @@ basicConstraints = CA:false""" % {'certdir': os.path.join(self.testdir, def setup_servers(self, env=None): raise NotImplementedError() + def setup_step(self, message): + """Method to inform setup step starting.""" + self.current_setup_step = message + def run(self, env): + """Method to run the test process and receive progress reports. + + The test process is run in a subprocess because it needs to be run with + the socket and nss wrappers, which are used a LD_PRELOAD, which means + the environment must be set before the process starts. + + The process running run() (Test Control process) communicates with the + Test Process by reading specially formatted strings from standard out. + + All lines read from the test's stdout will be passed into TC.get_result + to determine whether a test result was provided. + """ exe = self.execname if exe.endswith('c'): exe = exe[:-1] - return subprocess.call([exe], env=env, - stdout=self.stdout, stderr=self.stderr) + return self.run_and_collect([exe], env) + + def run_and_collect(self, cmd, env): + p = subprocess.Popen(cmd, env=env, + stdout=subprocess.PIPE, stderr=self.stderr) + results = [] + for line in p.stdout: + line = line[:-1] # Strip newline + result = TC.get_result(line) + if result: + if self.print_cases: + TC.output(result) + results.append(result) + else: + if self.stdout is None: + print(line) + return p.wait(), results diff --git a/tests/helpers/control.py b/tests/helpers/control.py new file mode 100644 index 0000000..9fb6c06 --- /dev/null +++ b/tests/helpers/control.py @@ -0,0 +1,98 @@ +# Copyright (C) 2017 Ipsilon project Contributors, for license see COPYING + +from __future__ import print_function + +import sys + + +class TC(object): + """Test Control helpers methods. + + This class is here to give methods short names, and users should not need + to instantiate classes. + """ + prefix = '**TEST**:' + output_method = print + + def __init__(self): + raise Exception("No need to initialize Test Control class instances") + + @staticmethod + def store_results(lst): + """Registers an output_method that adds results into lst.""" + @staticmethod + def putter(msg): + lst.append(msg) + TC.output_method = putter + + class case(object): + def __init__(self, name, should_fail=False): + self.name = name + self.should_fail = should_fail + + def __enter__(self): + TC.output_method(TC.prefix + 'start:' + self.name) + + def __exit__(self, exc_class, exc, tb): + if exc is None and not self.should_fail: + TC.output_method(TC.prefix + 'done') + elif not self.should_fail: + TC.output_method(TC.prefix + 'fail:' + repr(exc)) + sys.exit(1) + elif not exc: + TC.output_method(TC.prefix + 'fail:Should have failed') + sys.exit(1) + else: + # should_fail can either be True, in which any exception counts + # as pass, or it can be a string, in which case the string + # needs to occur in the str(exc) to count as a pass + failed_correctly = False + if self.should_fail is True: + failed_correctly = True + else: + failed_correctly = self.should_fail in str(exc) + + if failed_correctly: + TC.output_method(TC.prefix + 'done') + return True # Tell Python to swallow the exception + else: + TC.output_method(TC.prefix + 'fail:' + repr(exc)) + sys.exit(1) + + @staticmethod + def info(msg): + TC.output_method(TC.prefix + 'info:' + msg) + + @staticmethod + def fail(msg): + TC.output_method(TC.prefix + 'fail:' + msg) + sys.exit(1) + + @staticmethod + def get_result(line): + """Determines whether the line is a test case result. + + If the input line is a test case result, a tuple is returned with the + different result fields. If not, None is returned. + + The output tuple depends on the type of result: + case start: ('start', 'casename') + case done: ('done',) + case fail: ('fail', 'some error') + """ + if line.startswith(TC.prefix): + return tuple(line[len(TC.prefix):].split(':')) + else: + return None + + @staticmethod + def output(result): + """Prints the result tuple.""" + if result[0] == 'start': + print('Case %s... ' % result[1], end=' ') + elif result[0] == 'info': + print('Info: %s' % result[1]) + elif result[0] == 'done': + print('SUCCESS') + elif result[0] == 'fail': + print('FAILED: %s' % result[1]) diff --git a/tests/tests.py b/tests/tests.py index 53d5b8d..e5a688f 100755 --- a/tests/tests.py +++ b/tests/tests.py @@ -1,6 +1,6 @@ #!/usr/bin/python # -# Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING +# Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING from __future__ import print_function @@ -8,33 +8,51 @@ __requires__ = ['sqlalchemy >= 0.8'] import pkg_resources # pylint: disable=unused-import import argparse -import inspect from ipsilon.util import plugin import os import sys import subprocess -import time -import traceback from helpers.common import WRAP_HOSTNAME # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import logger = None -class Tests(object): +VERBOSE_SHOWTESTS = 1 +VERBOSE_SHOWCASES = 2 +VERBOSE_SHOWOUTPUT = 3 - def __init__(self): - p = plugin.Plugins() - (pathname, dummy) = os.path.split(inspect.getfile(Tests)) - self.plugins = p.get_plugins(pathname, 'IpsilonTest') + +TEST_RESULT_SUCCESS = 0 +TEST_RESULT_SKIP = 1 +TEST_RESULT_FAIL = 2 +TEST_RESULT_EXCEPTION = 3 +TEST_RESULT_SETUP_FAILED = 4 + + +def get_tests(): + p = plugin.Plugins() + (pathname, _) = os.path.split(os.path.realpath(__file__)) + return p.get_plugins(pathname, 'IpsilonTest') def parse_args(): parser = argparse.ArgumentParser(description='Ipsilon Tests Environment') + parser.add_argument('--results-header', default='Test results:', + help='Test results header') parser.add_argument('--path', default='%s/testdir' % os.getcwd(), help="Directory in which tests are run") - parser.add_argument('--test', default='test1', - help="The test to run") + parser.add_argument('--fail-on-first-error', '-x', action='store_true', + help='Abort test run on first test failure') + parser.add_argument('--test', action='append', default=None, + help="Add a test to run") + parser.add_argument('--list-tests', '-L', action='store_true', + help='List all available tests') + parser.add_argument('--no-overview', '-q', action='store_true', + help='Suppress final summary') + parser.add_argument('--verbose', '-v', action='count', + help='Increase verbosity') parser.add_argument('--wrappers', default='auto', choices=['yes', 'no', 'auto'], help="Run the tests with socket wrappers") @@ -79,23 +97,17 @@ def try_wrappers(base, wrappers, allow_wrappers): return wenv -if __name__ == '__main__': - - args = parse_args() - - tests = Tests() - if args['test'] not in tests.plugins: - print("Unknown test [%s]" % args['test'], file=sys.stderr) - sys.exit(1) - test = tests.plugins[args['test']] +def run_test(testname, test, args): + supported = test.platform_supported() + if supported is not None: + return (TEST_RESULT_SKIP, supported) + if args['verbose'] <= VERBOSE_SHOWOUTPUT: + devnull = open(os.devnull, 'w') + test.stdout = devnull + test.stderr = devnull - if not test.platform_supported(): - print("Test %s not supported on platform" % args['test'], - file=sys.stderr) - sys.exit(0) - - if not os.path.exists(args['path']): - os.makedirs(args['path']) + if args['verbose'] >= VERBOSE_SHOWCASES: + test.print_cases = True test.setup_base(args['path'], test) @@ -103,19 +115,92 @@ if __name__ == '__main__': env['PYTHONPATH'] = test.rootdir env['TESTDIR'] = test.testdir + results = [] + post_setup = False + TC.store_results(results) try: test.setup_servers(env) + post_setup = True - code = test.run(env) + code, results = test.run(env) if code: - sys.exit(code) + return (TEST_RESULT_FAIL, code, results) except Exception as e: # pylint: disable=broad-except - print("Error: %s" % repr(e), file=sys.stderr) - traceback.print_exc(None, sys.stderr) - sys.exit(1) + if post_setup: + return (TEST_RESULT_EXCEPTION, e, results) + else: + return (TEST_RESULT_SETUP_FAILED, test.current_setup_step) finally: test.wait() - # Wait until all of the sockets are closed by the OS - time.sleep(0.5) - print("FINISHED") + return (TEST_RESULT_SUCCESS, results) + + +def result_to_str(result): + if result[0] == TEST_RESULT_SUCCESS: + return 'Test passed' + elif result[0] == TEST_RESULT_SKIP: + return 'Test skipped: %s' % result[1] + elif result[0] == TEST_RESULT_FAIL: + return 'Test failed with code %i' % result[1] + elif result[0] == TEST_RESULT_EXCEPTION: + return 'Test failed with error: %s' % repr(result[1]) + elif result[0] == TEST_RESULT_SETUP_FAILED: + return 'Test setup failed at step: %s' % result[1] + else: + return 'Unknown test result %s' % result[0] + + +def result_is_fail(result): + return result[0] not in (TEST_RESULT_SUCCESS, TEST_RESULT_SKIP) + + +def main(): + args = parse_args() + + tests = get_tests() + if args['list_tests']: + for testname in tests.keys(): + print(testname) + sys.exit(0) + + if args['test'] is None: + args['test'] = tests.keys() + unknown_tests = False + for test in args['test']: + if test not in tests: + unknown_tests = True + print("Unknown test [%s]" % test, file=sys.stderr) + if unknown_tests: + sys.exit(1) + args['test'] = set(args['test']) + + if not os.path.exists(args['path']): + os.makedirs(args['path']) + + test_results = {} + + for test in args['test']: + if args['verbose'] >= VERBOSE_SHOWTESTS: + print('Running test %s' % test) + result = run_test(test, tests[test], args) + test_results[test] = result + + if args['verbose'] >= VERBOSE_SHOWTESTS: + print(result_to_str(result)) + + if args['fail_on_first_error'] and result_is_fail(result): + break + + if not args['no_overview']: + print(args['results_header']) + for test in test_results: + print('{:15s} {}'.format(test, result_to_str(test_results[test]))) + + if any(result_is_fail(result) + for result in test_results.values()): + sys.exit(1) + + +if __name__ == '__main__': + main()