From a72d5cd02c9b0ae6c6cf3fcbc8407110c8d5494d Mon Sep 17 00:00:00 2001 From: Patrick Uiterwijk Date: Jun 14 2017 14:14:34 +0000 Subject: [PATCH 1/8] Allow passing stdout and stderr to tests Signed-off-by: Patrick Uiterwijk --- diff --git a/tests/helpers/common.py b/tests/helpers/common.py index 09137fa..8199ed5 100755 --- a/tests/helpers/common.py +++ b/tests/helpers/common.py @@ -81,6 +81,8 @@ class IpsilonTestBase(object): self.testuser = pwd.getpwuid(os.getuid())[0] self.processes = [] self.allow_wrappers = allow_wrappers + self.stdout = None + self.stderr = None def platform_supported(self): """This return whether the current platform supports this test. @@ -116,7 +118,8 @@ class IpsilonTestBase(object): '-x509', '-nodes', '-subj', '/CN=Ipsilon Test CA', '-keyout', os.path.join(self.testdir, 'certs', 'root.key.pem'), '-out', os.path.join(self.testdir, 'certs', 'root.cert.pem')] - subprocess.check_call(cmd) + subprocess.check_call(cmd, + stdout=self.stdout, stderr=self.stderr) open(os.path.join(self.testdir, 'certs', 'db'), 'w').close() with open(os.path.join(self.testdir, 'certs', 'serial'), 'w') as ser: @@ -203,7 +206,8 @@ basicConstraints = CA:false""" % {'certdir': os.path.join(self.testdir, '-out', '%s.csr' % certpath, '-keyout', keypath, '-subj', '/CN=Ipsilon Test %s' % name] - subprocess.check_call(cmd) + subprocess.check_call(cmd, + stdout=self.stdout, stderr=self.stderr) cmd = ['openssl', 'ca', '-batch', '-notext', '-days', '2', '-md', 'sha1', '-subj', '/CN=Ipsilon Test %s' % name, @@ -221,7 +225,8 @@ basicConstraints = CA:false""" % {'certdir': os.path.join(self.testdir, # We just set it to a known value to make sure openssl doesn't # crash. ipaddr = '127.0.0.10' - subprocess.check_call(cmd, env={'ADDR': addr, 'IPADDR': ipaddr}) + subprocess.check_call(cmd, env={'ADDR': addr, 'IPADDR': ipaddr}, + stdout=self.stdout, stderr=self.stderr) def setup_idp_server(self, profile, name, addr, port, env): http_conf_file = self.setup_http(name, addr, port) @@ -233,7 +238,8 @@ basicConstraints = CA:false""" % {'certdir': os.path.join(self.testdir, cmd = [os.path.join(self.rootdir, 'ipsilon/install/ipsilon-server-install'), '--config-profile=%s' % profile] - subprocess.check_call(cmd, env=env) + subprocess.check_call(cmd, env=env, + stdout=self.stdout, stderr=self.stderr) os.symlink(os.path.join(self.rootdir, 'ipsilon'), os.path.join(self.testdir, 'lib', name, 'ipsilon')) @@ -244,13 +250,15 @@ basicConstraints = CA:false""" % {'certdir': os.path.join(self.testdir, cmd = [os.path.join(self.rootdir, 'ipsilon/install/ipsilon-client-install'), '--config-profile=%s' % profile] - subprocess.check_call(cmd, env=env) + subprocess.check_call(cmd, env=env, + stdout=self.stdout, stderr=self.stderr) return http_conf_file def setup_pgdb(self, datadir, env): cmd = ['/usr/bin/pg_ctl', 'initdb', '-D', datadir, '-o', '-E UNICODE'] - subprocess.check_call(cmd, env=env) + subprocess.check_call(cmd, env=env, + stdout=self.stdout, stderr=self.stderr) auth = 'host all all 127.0.0.1/24 trust\n' filename = os.path.join(datadir, 'pg_hba.conf') with open(filename, 'a') as f: @@ -267,7 +275,8 @@ basicConstraints = CA:false""" % {'certdir': os.path.join(self.testdir, env['ETCD_INITIAL_ADVERTISE_PEER_URLS'] = 'http://%s:%s' % (addr, srvport) p = subprocess.Popen(['/usr/bin/etcd'], - env=env, preexec_fn=os.setsid) + env=env, preexec_fn=os.setsid, + stdout=self.stdout, stderr=self.stderr) self.processes.append(p) return p @@ -277,7 +286,8 @@ basicConstraints = CA:false""" % {'certdir': os.path.join(self.testdir, env['REQUESTS_CA_BUNDLE'] = os.path.join(self.testdir, 'certs', 'root.cert.pem') p = subprocess.Popen(['/usr/sbin/httpd', '-DFOREGROUND', '-f', conf], - env=env, preexec_fn=os.setsid) + env=env, preexec_fn=os.setsid, + stdout=self.stdout, stderr=self.stderr) self.processes.append(p) return p @@ -286,13 +296,15 @@ basicConstraints = CA:false""" % {'certdir': os.path.join(self.testdir, '-k %s -c port=%s -c \ listen_addresses=%s' % (rundir, port, addr), '-l', log, '-w'], - env=env, preexec_fn=os.setsid) + env=env, preexec_fn=os.setsid, + stdout=self.stdout, stderr=self.stderr) self.processes.append(p) p.wait() for d in ['adminconfig', 'users', 'transactions', 'sessions', 'saml2.sessions.db']: cmd = ['/usr/bin/createdb', '-h', addr, '-p', port, d] - subprocess.check_call(cmd, env=env) + subprocess.check_call(cmd, env=env, + stdout=self.stdout, stderr=self.stderr) def setup_ldap(self, env): ldapdir = os.path.join(self.testdir, 'ldap') @@ -304,14 +316,16 @@ basicConstraints = CA:false""" % {'certdir': os.path.join(self.testdir, with open(filename, 'w+') as f: f.write(text) subprocess.check_call(['/usr/sbin/slapadd', '-f', filename, '-l', - 'tests/ldapdata.ldif'], env=env) + 'tests/ldapdata.ldif'], env=env, + stdout=self.stdout, stderr=self.stderr) return filename def start_ldap_server(self, conf, addr, port, env): p = subprocess.Popen(['/usr/sbin/slapd', '-d', '0', '-f', conf, '-h', 'ldap://%s:%s' % (addr, port)], - env=env, preexec_fn=os.setsid) + env=env, preexec_fn=os.setsid, + stdout=self.stdout, stderr=self.stderr) self.processes.append(p) def setup_kdc(self, env): @@ -358,7 +372,8 @@ basicConstraints = CA:false""" % {'certdir': os.path.join(self.testdir, raise ValueError('KDC Setup failed') kdcproc = subprocess.Popen(['krb5kdc', '-n'], - env=kdcenv, preexec_fn=os.setsid) + env=kdcenv, preexec_fn=os.setsid, + stdout=self.stdout, stderr=self.stderr) self.processes.append(kdcproc) return kdcenv @@ -423,4 +438,5 @@ basicConstraints = CA:false""" % {'certdir': os.path.join(self.testdir, exe = self.execname if exe.endswith('c'): exe = exe[:-1] - return subprocess.call([exe], env=env) + return subprocess.call([exe], env=env, + stdout=self.stdout, stderr=self.stderr) From c6f98eba7ee3e737decd9d5abc289f46b4745913 Mon Sep 17 00:00:00 2001 From: Patrick Uiterwijk Date: Jun 14 2017 14:14:34 +0000 Subject: [PATCH 2/8] Rework test framework The reworked framework is able to summarize test runs, and is less verbose. Signed-off-by: Patrick Uiterwijk --- diff --git a/tests/helpers/common.py b/tests/helpers/common.py index 8199ed5..e366db2 100755 --- a/tests/helpers/common.py +++ b/tests/helpers/common.py @@ -12,6 +12,8 @@ import random from string import Template import subprocess +from control import TC # pylint: disable=relative-import + WRAP_HOSTNAME = 'idp.ipsilon.dev' TESTREALM = 'IPSILON.DEV' @@ -81,6 +83,8 @@ class IpsilonTestBase(object): self.testuser = pwd.getpwuid(os.getuid())[0] self.processes = [] self.allow_wrappers = allow_wrappers + self.current_setup_step = None + self.print_cases = False self.stdout = None self.stderr = None @@ -89,9 +93,13 @@ class IpsilonTestBase(object): This is used for example with specific modules or features that are not supported on all platforms due to dependency availability. + + If the platform is supported, it returns None. + Otherwise it returns a string indicating why the platform does not + support the current test. """ # Every test defaults to being available on every platform - return True + return None def force_remove(self, op, name, info): os.chmod(name, 0700) @@ -317,7 +325,7 @@ basicConstraints = CA:false""" % {'certdir': os.path.join(self.testdir, f.write(text) subprocess.check_call(['/usr/sbin/slapadd', '-f', filename, '-l', 'tests/ldapdata.ldif'], env=env, - stdout=self.stdout, stderr=self.stderr) + stdout=self.stdout, stderr=self.stderr) return filename @@ -434,9 +442,40 @@ basicConstraints = CA:false""" % {'certdir': os.path.join(self.testdir, def setup_servers(self, env=None): raise NotImplementedError() + def setup_step(self, message): + """Method to inform setup step starting.""" + self.current_setup_step = message + def run(self, env): + """Method to run the test process and receive progress reports. + + The test process is run in a subprocess because it needs to be run with + the socket and nss wrappers, which are used a LD_PRELOAD, which means + the environment must be set before the process starts. + + The process running run() (Test Control process) communicates with the + Test Process by reading specially formatted strings from standard out. + + All lines read from the test's stdout will be passed into TC.get_result + to determine whether a test result was provided. + """ exe = self.execname if exe.endswith('c'): exe = exe[:-1] - return subprocess.call([exe], env=env, - stdout=self.stdout, stderr=self.stderr) + return self.run_and_collect([exe], env) + + def run_and_collect(self, cmd, env): + p = subprocess.Popen(cmd, env=env, + stdout=subprocess.PIPE, stderr=self.stderr) + results = [] + for line in p.stdout: + line = line[:-1] # Strip newline + result = TC.get_result(line) + if result: + if self.print_cases: + TC.output(result) + results.append(result) + else: + if self.stdout is None: + print(line) + return p.wait(), results diff --git a/tests/helpers/control.py b/tests/helpers/control.py new file mode 100644 index 0000000..9fb6c06 --- /dev/null +++ b/tests/helpers/control.py @@ -0,0 +1,98 @@ +# Copyright (C) 2017 Ipsilon project Contributors, for license see COPYING + +from __future__ import print_function + +import sys + + +class TC(object): + """Test Control helpers methods. + + This class is here to give methods short names, and users should not need + to instantiate classes. + """ + prefix = '**TEST**:' + output_method = print + + def __init__(self): + raise Exception("No need to initialize Test Control class instances") + + @staticmethod + def store_results(lst): + """Registers an output_method that adds results into lst.""" + @staticmethod + def putter(msg): + lst.append(msg) + TC.output_method = putter + + class case(object): + def __init__(self, name, should_fail=False): + self.name = name + self.should_fail = should_fail + + def __enter__(self): + TC.output_method(TC.prefix + 'start:' + self.name) + + def __exit__(self, exc_class, exc, tb): + if exc is None and not self.should_fail: + TC.output_method(TC.prefix + 'done') + elif not self.should_fail: + TC.output_method(TC.prefix + 'fail:' + repr(exc)) + sys.exit(1) + elif not exc: + TC.output_method(TC.prefix + 'fail:Should have failed') + sys.exit(1) + else: + # should_fail can either be True, in which any exception counts + # as pass, or it can be a string, in which case the string + # needs to occur in the str(exc) to count as a pass + failed_correctly = False + if self.should_fail is True: + failed_correctly = True + else: + failed_correctly = self.should_fail in str(exc) + + if failed_correctly: + TC.output_method(TC.prefix + 'done') + return True # Tell Python to swallow the exception + else: + TC.output_method(TC.prefix + 'fail:' + repr(exc)) + sys.exit(1) + + @staticmethod + def info(msg): + TC.output_method(TC.prefix + 'info:' + msg) + + @staticmethod + def fail(msg): + TC.output_method(TC.prefix + 'fail:' + msg) + sys.exit(1) + + @staticmethod + def get_result(line): + """Determines whether the line is a test case result. + + If the input line is a test case result, a tuple is returned with the + different result fields. If not, None is returned. + + The output tuple depends on the type of result: + case start: ('start', 'casename') + case done: ('done',) + case fail: ('fail', 'some error') + """ + if line.startswith(TC.prefix): + return tuple(line[len(TC.prefix):].split(':')) + else: + return None + + @staticmethod + def output(result): + """Prints the result tuple.""" + if result[0] == 'start': + print('Case %s... ' % result[1], end=' ') + elif result[0] == 'info': + print('Info: %s' % result[1]) + elif result[0] == 'done': + print('SUCCESS') + elif result[0] == 'fail': + print('FAILED: %s' % result[1]) diff --git a/tests/tests.py b/tests/tests.py index 53d5b8d..e5a688f 100755 --- a/tests/tests.py +++ b/tests/tests.py @@ -1,6 +1,6 @@ #!/usr/bin/python # -# Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING +# Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING from __future__ import print_function @@ -8,33 +8,51 @@ __requires__ = ['sqlalchemy >= 0.8'] import pkg_resources # pylint: disable=unused-import import argparse -import inspect from ipsilon.util import plugin import os import sys import subprocess -import time -import traceback from helpers.common import WRAP_HOSTNAME # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import logger = None -class Tests(object): +VERBOSE_SHOWTESTS = 1 +VERBOSE_SHOWCASES = 2 +VERBOSE_SHOWOUTPUT = 3 - def __init__(self): - p = plugin.Plugins() - (pathname, dummy) = os.path.split(inspect.getfile(Tests)) - self.plugins = p.get_plugins(pathname, 'IpsilonTest') + +TEST_RESULT_SUCCESS = 0 +TEST_RESULT_SKIP = 1 +TEST_RESULT_FAIL = 2 +TEST_RESULT_EXCEPTION = 3 +TEST_RESULT_SETUP_FAILED = 4 + + +def get_tests(): + p = plugin.Plugins() + (pathname, _) = os.path.split(os.path.realpath(__file__)) + return p.get_plugins(pathname, 'IpsilonTest') def parse_args(): parser = argparse.ArgumentParser(description='Ipsilon Tests Environment') + parser.add_argument('--results-header', default='Test results:', + help='Test results header') parser.add_argument('--path', default='%s/testdir' % os.getcwd(), help="Directory in which tests are run") - parser.add_argument('--test', default='test1', - help="The test to run") + parser.add_argument('--fail-on-first-error', '-x', action='store_true', + help='Abort test run on first test failure') + parser.add_argument('--test', action='append', default=None, + help="Add a test to run") + parser.add_argument('--list-tests', '-L', action='store_true', + help='List all available tests') + parser.add_argument('--no-overview', '-q', action='store_true', + help='Suppress final summary') + parser.add_argument('--verbose', '-v', action='count', + help='Increase verbosity') parser.add_argument('--wrappers', default='auto', choices=['yes', 'no', 'auto'], help="Run the tests with socket wrappers") @@ -79,23 +97,17 @@ def try_wrappers(base, wrappers, allow_wrappers): return wenv -if __name__ == '__main__': - - args = parse_args() - - tests = Tests() - if args['test'] not in tests.plugins: - print("Unknown test [%s]" % args['test'], file=sys.stderr) - sys.exit(1) - test = tests.plugins[args['test']] +def run_test(testname, test, args): + supported = test.platform_supported() + if supported is not None: + return (TEST_RESULT_SKIP, supported) + if args['verbose'] <= VERBOSE_SHOWOUTPUT: + devnull = open(os.devnull, 'w') + test.stdout = devnull + test.stderr = devnull - if not test.platform_supported(): - print("Test %s not supported on platform" % args['test'], - file=sys.stderr) - sys.exit(0) - - if not os.path.exists(args['path']): - os.makedirs(args['path']) + if args['verbose'] >= VERBOSE_SHOWCASES: + test.print_cases = True test.setup_base(args['path'], test) @@ -103,19 +115,92 @@ if __name__ == '__main__': env['PYTHONPATH'] = test.rootdir env['TESTDIR'] = test.testdir + results = [] + post_setup = False + TC.store_results(results) try: test.setup_servers(env) + post_setup = True - code = test.run(env) + code, results = test.run(env) if code: - sys.exit(code) + return (TEST_RESULT_FAIL, code, results) except Exception as e: # pylint: disable=broad-except - print("Error: %s" % repr(e), file=sys.stderr) - traceback.print_exc(None, sys.stderr) - sys.exit(1) + if post_setup: + return (TEST_RESULT_EXCEPTION, e, results) + else: + return (TEST_RESULT_SETUP_FAILED, test.current_setup_step) finally: test.wait() - # Wait until all of the sockets are closed by the OS - time.sleep(0.5) - print("FINISHED") + return (TEST_RESULT_SUCCESS, results) + + +def result_to_str(result): + if result[0] == TEST_RESULT_SUCCESS: + return 'Test passed' + elif result[0] == TEST_RESULT_SKIP: + return 'Test skipped: %s' % result[1] + elif result[0] == TEST_RESULT_FAIL: + return 'Test failed with code %i' % result[1] + elif result[0] == TEST_RESULT_EXCEPTION: + return 'Test failed with error: %s' % repr(result[1]) + elif result[0] == TEST_RESULT_SETUP_FAILED: + return 'Test setup failed at step: %s' % result[1] + else: + return 'Unknown test result %s' % result[0] + + +def result_is_fail(result): + return result[0] not in (TEST_RESULT_SUCCESS, TEST_RESULT_SKIP) + + +def main(): + args = parse_args() + + tests = get_tests() + if args['list_tests']: + for testname in tests.keys(): + print(testname) + sys.exit(0) + + if args['test'] is None: + args['test'] = tests.keys() + unknown_tests = False + for test in args['test']: + if test not in tests: + unknown_tests = True + print("Unknown test [%s]" % test, file=sys.stderr) + if unknown_tests: + sys.exit(1) + args['test'] = set(args['test']) + + if not os.path.exists(args['path']): + os.makedirs(args['path']) + + test_results = {} + + for test in args['test']: + if args['verbose'] >= VERBOSE_SHOWTESTS: + print('Running test %s' % test) + result = run_test(test, tests[test], args) + test_results[test] = result + + if args['verbose'] >= VERBOSE_SHOWTESTS: + print(result_to_str(result)) + + if args['fail_on_first_error'] and result_is_fail(result): + break + + if not args['no_overview']: + print(args['results_header']) + for test in test_results: + print('{:15s} {}'.format(test, result_to_str(test_results[test]))) + + if any(result_is_fail(result) + for result in test_results.values()): + sys.exit(1) + + +if __name__ == '__main__': + main() From 850ff636347e715b53ff27e7e346ab206d3b287f Mon Sep 17 00:00:00 2001 From: Patrick Uiterwijk Date: Jun 14 2017 14:14:34 +0000 Subject: [PATCH 3/8] Update tests to use new test framework Signed-off-by: Patrick Uiterwijk --- diff --git a/tests/attrs.py b/tests/attrs.py index b92048a..ab7ba4f 100755 --- a/tests/attrs.py +++ b/tests/attrs.py @@ -1,14 +1,12 @@ #!/usr/bin/python # -# Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING - -from __future__ import print_function +# Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import os import pwd -import sys from string import Template @@ -87,17 +85,17 @@ class IpsilonTest(IpsilonTestBase): super(IpsilonTest, self).__init__('attrs', __file__) def setup_servers(self, env=None): - print("Installing IDP server") + self.setup_step("Installing IDP server") name = 'idp1' addr = '127.0.0.10' port = '45080' idp = self.generate_profile(idp_g, idp_a, name, addr, port) conf = self.setup_idp_server(idp, name, addr, port, env) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") self.start_http_server(conf, env) - print("Installing SP server") + self.setup_step("Installing SP server") name = 'sp1' addr = '127.0.0.11' port = '45081' @@ -105,7 +103,7 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting SP's httpd server") + self.setup_step("Starting SP's httpd server") self.start_http_server(conf, env) @@ -119,28 +117,13 @@ if __name__ == '__main__': sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon') sess.add_server(spname, 'https://127.0.0.11:45081') - print("attrs: Authenticate to IDP ...", end=' ') - try: + with TC.case('Authenticate to IdP'): sess.auth_to_idp(idpname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("attrs: Add SP Metadata to IDP ...", end=' ') - try: + with TC.case('Add SP Metadata to IdP'): sess.add_sp_metadata(idpname, spname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("attrs: Access SP Protected Area Variables...", end=' ') - try: + with TC.case('Access SP Protected Area Variables'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/index.shtml') page.expected_value('text()', 'Test User %s' % user) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") diff --git a/tests/authz.py b/tests/authz.py index 2ea461d..090ff0d 100755 --- a/tests/authz.py +++ b/tests/authz.py @@ -1,14 +1,12 @@ #!/usr/bin/python # -# Copyright (C) 2016 Ipsilon project Contributors, for license see COPYING - -from __future__ import print_function +# Copyright (C) 2016-2017 Ipsilon project Contributors, for license see COPYING from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import os import pwd -import sys from string import Template idp_g = {'TEMPLATES': '${TESTDIR}/templates/install', @@ -93,17 +91,17 @@ class IpsilonTest(IpsilonTestBase): super(IpsilonTest, self).__init__('authz', __file__) def setup_servers(self, env=None): - print("Installing IDP server") + self.setup_step("Installing IDP server") name = 'idp1' addr = '127.0.0.10' port = '45080' idp = self.generate_profile(idp_g, idp_a, name, addr, port) conf = self.setup_idp_server(idp, name, addr, port, env) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") self.start_http_server(conf, env) - print("Installing first SP server") + self.setup_step("Installing first SP server") name = 'sp1' addr = '127.0.0.11' port = '45081' @@ -111,10 +109,10 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting first SP's httpd server") + self.setup_step("Starting first SP's httpd server") self.start_http_server(conf, env) - print("Installing second SP server") + self.setup_step("Installing second SP server") name = 'sp2' addr = '127.0.0.12' port = '45082' @@ -122,7 +120,7 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting second SP's httpd server") + self.setup_step("Starting second SP's httpd server") self.start_http_server(conf, env) @@ -138,107 +136,56 @@ if __name__ == '__main__': sess.add_server(sp1name, 'https://127.0.0.11:45081') sess.add_server(sp2name, 'https://127.0.0.12:45082') - print("authz: Authenticate to IDP ...", end=' ') - try: + with TC.case('Authenticate to IdP'): sess.auth_to_idp(idpname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("authz: Add SP1 Metadata to IDP ...", end=' ') - try: + with TC.case('Add SP1 Metadata to IdP'): sess.add_sp_metadata(idpname, sp1name) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("authz: Add SP2 Metadata to IDP ...", end=' ') - try: + with TC.case('Add SP2 Metadata to IdP'): sess.add_sp_metadata(idpname, sp2name) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("authz: Access SP1 when authz stack set to allow ...", end=' ') - try: + with TC.case('Access SP1 when authz stack set to allow'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/') page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("authz: Set IDP authz stack to deny ...", end=' ') - try: + with TC.case('Set IdP authz stack to deny'): sess.disable_plugin(idpname, 'authz', 'allow') sess.enable_plugin(idpname, 'authz', 'deny') - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") sess2 = HttpSessions() sess2.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon') sess2.add_server(sp1name, 'https://127.0.0.11:45081') - print("authz: Fail access SP1 when authz stack set to deny, with " - "pre-auth ...", end=' ') - try: + with TC.case('Fail access to SP1 when authz stack set to deny, with ' + 'pre-auth'): sess2.auth_to_idp(idpname) page = sess2.fetch_page(idpname, 'https://127.0.0.11:45081/sp/') page.expected_status(401) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") sess3 = HttpSessions() sess3.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon') sess3.add_server(sp1name, 'https://127.0.0.11:45081') - print("authz: Fail access SP1 when authz stack set to deny, without " - "pre-auth ...", end=' ') - try: + with TC.case('Fail access to SP1 with authz stack set to deny, without ' + 'pre-auth'): page = sess3.fetch_page(idpname, 'https://127.0.0.11:45081/sp/') page.expected_status(401) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("authz: Set IDP authz stack to spgroup ...", end=' ') - try: + with TC.case('Set IdP authz stack to spgroup'): sess.disable_plugin(idpname, 'authz', 'deny') sess.enable_plugin(idpname, 'authz', 'spgroup') - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") sess4 = HttpSessions() sess4.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon') sess4.add_server(sp1name, 'https://127.0.0.11:45081') sess4.add_server(sp2name, 'https://127.0.0.12:45082') - print("authz: Access SP1 when authz stack set to spgroup ...", end=' ') - try: + with TC.case('Access SP1 with authz stack set to spgroup'): sess4.auth_to_idp(idpname) page = sess4.fetch_page(idpname, 'https://127.0.0.11:45081/sp/') page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - - print("authz: Fail to access SP2 when authz stack set to spgroup ...", - end=' ') - try: + + with TC.case('Fail to access SP2 with authz stack set to spgroup'): page = sess4.fetch_page(idpname, 'https://127.0.0.12:45082/sp/') page.expected_status(401) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") diff --git a/tests/dbupgrades.py b/tests/dbupgrades.py index fbddf63..b5a8046 100755 --- a/tests/dbupgrades.py +++ b/tests/dbupgrades.py @@ -1,10 +1,9 @@ #!/usr/bin/python # -# Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING - -from __future__ import print_function +# Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import os import pwd @@ -49,11 +48,10 @@ class IpsilonTest(IpsilonTestBase): db_name = 'adminconfig' test_db = os.path.join(db_outdir, '%s.sqlite' % db_name) p = subprocess.Popen(['/usr/bin/sqlite3', test_db, '.dump'], - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, stderr=self.stderr) output, _ = p.communicate() if p.returncode: - print('Sqlite dump failed') - sys.exit(1) + TC.fail('Sqlite dump failed') return output def use_readonly_adminconfig(self, name): @@ -75,7 +73,7 @@ class IpsilonTest(IpsilonTestBase): def test_upgrade_from(self, env, old_version, with_readonly): # Setup IDP Server - print("Installing IDP server to test upgrade from %i" % old_version) + TC.info("Installing IDP server to test upgrade from %i" % old_version) name = 'idp_v%i' % old_version if with_readonly: name = name + '_readonly' @@ -105,7 +103,9 @@ class IpsilonTest(IpsilonTestBase): if database not in ['adminconfig', 'openid'] or not with_readonly: cmd = ['/usr/bin/sqlite3', db_out, '.read %s' % db_in] - subprocess.check_call(cmd) + subprocess.check_call(cmd, + stdout=self.stdout, + stderr=self.stderr) # Upgrade that database cmd = [os.path.join(self.rootdir, @@ -113,7 +113,8 @@ class IpsilonTest(IpsilonTestBase): cfgfile] subprocess.check_call(cmd, cwd=os.path.join(self.testdir, 'lib', name), - env=env) + env=env, + stdout=self.stdout, stderr=self.stderr) # Check some version-specific changes, to see if the upgrade went OK if old_version == 0: @@ -156,18 +157,29 @@ class IpsilonTest(IpsilonTestBase): exe.append('no-readonly') exe.append(name) exe.append('%s:%s' % (addr, port)) - exit_code = subprocess.call(exe, env=env) - if exit_code: - sys.exit(exit_code) + result = self.run_and_collect(exe, env=env) # Now kill the last http server os.killpg(http_server.pid, signal.SIGTERM) self.processes.remove(http_server) + return result + def run(self, env): + overall_exit_code = 0 + overall_results = [] + for version in range(ipsilon.util.data.CURRENT_SCHEMA_VERSION): for with_readonly in [True, False]: - self.test_upgrade_from(env, version, with_readonly) + exit_code, results = self.test_upgrade_from(env, + version, + with_readonly) + + if exit_code != 0: + overall_exit_code = 1 + overall_results.extend(results) + + return overall_exit_code, overall_results if __name__ == '__main__': @@ -182,12 +194,5 @@ if __name__ == '__main__': sess.add_server(idpname, 'https://%s' % url, user, 'ipsilon') - print("dbupgrades: From v%s %s: Authenticate to IDP ..." % (from_version, - with_ro), - end=' ') - try: + with TC.case('From v%s %s: Authenticate to IdP' % (from_version, with_ro)): sess.auth_to_idp(idpname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") diff --git a/tests/fconf.py b/tests/fconf.py index ec59d23..439a0af 100755 --- a/tests/fconf.py +++ b/tests/fconf.py @@ -1,15 +1,13 @@ #!/usr/bin/python # -# Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING - -from __future__ import print_function +# Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import ConfigParser import os import pwd -import sys from string import Template import subprocess import uuid @@ -141,30 +139,31 @@ class IpsilonTest(IpsilonTestBase): super(IpsilonTest, self).__init__('fconf', __file__) def setup_servers(self, env=None): - print("Installing IDP server") + self.setup_step("Installing IDP server") idp = self.generate_profile(idp_g, idp_a, idpname, idpaddr, idpport) idpconf = self.setup_idp_server(idp, idpname, idpaddr, idpport, env) - print("Installing SP server") + self.setup_step("Installing SP server") sp = self.generate_profile(sp_g, sp_a, spname, spaddr, spport) spconf = self.setup_sp_server(sp, spname, spaddr, spport, env) fixup_sp_httpd(os.path.dirname(spconf)) fixup_idp_conf(self.testdir) - print("Testing database upgrade") + self.setup_step("Testing database upgrade") cfgfile = os.path.join(self.testdir, 'etc', idpname, 'ipsilon.conf') cmd = [os.path.join(self.rootdir, 'ipsilon/install/ipsilon-upgrade-database'), cfgfile] subprocess.check_call(cmd, cwd=os.path.join(self.testdir, 'lib', idpname), - env=env) + env=env, + stdout=self.stdout, stderr=self.stderr) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") self.start_http_server(idpconf, env) - print("Starting SP's httpd server") + self.setup_step("Starting SP's httpd server") self.start_http_server(spconf, env) @@ -176,20 +175,10 @@ if __name__ == '__main__': sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon') sess.add_server(spname, 'https://127.0.0.11:45081') - print("fconf: Access IdP Homepage ... ", end=' ') - try: + with TC.case('Access IdP homepage'): page = sess.fetch_page(idpname, 'https://127.0.0.10:45080/idp1/') page.expected_value('//title/text()', 'Ipsilon') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("fconf: Access SP Protected Area ...", end=' ') - try: + with TC.case('Access SP protected area'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/') page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") diff --git a/tests/ldap.py b/tests/ldap.py index e3b6d5e..74feb1f 100755 --- a/tests/ldap.py +++ b/tests/ldap.py @@ -1,13 +1,11 @@ #!/usr/bin/python # -# Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING - -from __future__ import print_function +# Copyright (C) 2015-2017 Ipsilon project Contributors, for license see COPYING from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import os -import sys from string import Template @@ -92,25 +90,25 @@ class IpsilonTest(IpsilonTestBase): def setup_servers(self, env=None): - print("Installing IDP's ldap server") + self.setup_step("Installing IDP's ldap server") addr = '127.0.0.10' port = '45389' conf = self.setup_ldap(env) - print("Starting IDP's ldap server") + self.setup_step("Starting IDP's ldap server") self.start_ldap_server(conf, addr, port, env) - print("Installing IDP server") + self.setup_step("Installing IDP server") name = 'idp1' addr = '127.0.0.10' port = '45080' idp = self.generate_profile(idp_g, idp_a, name, addr, port) conf = self.setup_idp_server(idp, name, addr, port, env) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") self.start_http_server(conf, env) - print("Installing SP server") + self.setup_step("Installing SP server") name = 'sp1' addr = '127.0.0.11' port = '45081' @@ -118,7 +116,7 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting SP's httpd server") + self.setup_step("Starting SP's httpd server") self.start_http_server(conf, env) @@ -132,28 +130,13 @@ if __name__ == '__main__': sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'tuser') sess.add_server(spname, 'https://127.0.0.11:45081') - print("ldap: Authenticate to IDP ...", end=' ') - try: + with TC.case('Authenticate to IdP'): sess.auth_to_idp(idpname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("ldap: Add SP Metadata to IDP ...", end=' ') - try: + with TC.case('Add SP Metadata to IdP'): sess.add_sp_metadata(idpname, spname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("ldap: Access SP Protected Area ...", end=' ') - try: + with TC.case('Access SP Protected Area'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/index.shtml') page.expected_value('text()', 'Test Group;Test Group 2') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") diff --git a/tests/ldapdown.py b/tests/ldapdown.py index 764ca85..6b05ecd 100755 --- a/tests/ldapdown.py +++ b/tests/ldapdown.py @@ -1,15 +1,13 @@ #!/usr/bin/python # -# Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING +# Copyright (C) 2015-2017 Ipsilon project Contributors, for license see COPYING # Test that we get a reasonable error back when the LDAP backend is down -from __future__ import print_function - from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import os -import sys from string import Template @@ -94,24 +92,24 @@ class IpsilonTest(IpsilonTestBase): def setup_servers(self, env=None): - print("Installing IDP's ldap server") + self.setup_step("Installing IDP's ldap server") addr = '127.0.0.10' port = '45389' conf = self.setup_ldap(env) - print("Not starting IDP's ldap server") + self.setup_step("Not starting IDP's ldap server") - print("Installing IDP server") + self.setup_step("Installing IDP server") name = 'idp1' addr = '127.0.0.10' port = '45080' idp = self.generate_profile(idp_g, idp_a, name, addr, port) conf = self.setup_idp_server(idp, name, addr, port, env) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") self.start_http_server(conf, env) - print("Installing SP server") + self.setup_step("Installing SP server") name = 'sp1' addr = '127.0.0.11' port = '45081' @@ -119,7 +117,7 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting SP's httpd server") + self.setup_step("Starting SP's httpd server") self.start_http_server(conf, env) @@ -133,14 +131,9 @@ if __name__ == '__main__': sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'tuser') sess.add_server(spname, 'https://127.0.0.11:45081') - print("ldapdown: Authenticate to IDP with no LDAP backend...", end=' ') - try: + with TC.case('Authenticate to Idp with no LDAP backend'): sess.auth_to_idp( idpname, rule='//div[@class="alert alert-danger"]/p/text()', expected="Internal system error" ) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") diff --git a/tests/openid.py b/tests/openid.py index 0261d21..41c2a38 100755 --- a/tests/openid.py +++ b/tests/openid.py @@ -1,14 +1,12 @@ #!/usr/bin/python # -# Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING - -from __future__ import print_function +# Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import os import pwd -import sys import inspect from string import Template @@ -62,17 +60,17 @@ class IpsilonTest(IpsilonTestBase): super(IpsilonTest, self).__init__('openid', __file__) def setup_servers(self, env=None): - print("Installing IDP server") + self.setup_step("Installing IDP server") name = 'idp1' addr = '127.0.0.10' port = '45080' idp = self.generate_profile(idp_g, idp_a, name, addr, port) conf = self.setup_idp_server(idp, name, addr, port, env) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") self.start_http_server(conf, env) - print("Installing first SP server") + self.setup_step("Installing first SP server") name = 'sp1' addr = '127.0.0.11' port = '45081' @@ -81,7 +79,7 @@ class IpsilonTest(IpsilonTestBase): inspect.currentframe()))) fixup_sp_httpd(os.path.dirname(conf), testdir) - print("Starting SP's httpd server") + self.setup_step("Starting SP's httpd server") self.start_http_server(conf, env) @@ -95,103 +93,56 @@ if __name__ == '__main__': sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon') sess.add_server(sp1name, 'https://127.0.0.11:45081') - print("openid: Authenticate to IDP ...", end=' ') - try: + with TC.case('Authenticate to IdP'): sess.auth_to_idp(idpname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("openid: Run OpenID Protocol ...", end=' ') - try: + with TC.case('Run OpenID Protocol'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/?extensions=NO', require_consent=True) page.expected_value('text()', 'SUCCESS, WITHOUT EXTENSIONS') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("openid: Run OpenID Protocol without consent ...", end=' ') - try: + with TC.case('Run OpenID Protocol without consent'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/?extensions=NO', require_consent=False) page.expected_value('text()', 'SUCCESS, WITHOUT EXTENSIONS') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("openid: Revoking SP consent ...", end=' ') - try: + with TC.case('Revoking SP consent'): page = sess.revoke_all_consent(idpname) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("openid: Run OpenID Protocol without consent ...", end=' ') - try: + with TC.case('Run OpenID Protocol without consent'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/?extensions=NO', require_consent=True) page.expected_value('text()', 'SUCCESS, WITHOUT EXTENSIONS') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("openid: Run OpenID Protocol with extensions ...", end=' ') - try: + with TC.case('Run OpenID PRotocol with extensions'): # We expect consent again because we added more attributes page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/?extensions=YES', require_consent=True) page.expected_value('text()', 'SUCCESS, WITH EXTENSIONS') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("openid: Set IDP authz stack to deny ...", end=' ') - try: + with TC.case('Set IdP authz stack to deny'): sess.disable_plugin(idpname, 'authz', 'allow') sess.enable_plugin(idpname, 'authz', 'deny') - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") sess2 = HttpSessions() sess2.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon') sess2.add_server(sp1name, 'https://127.0.0.11:45081') - print("openid: Run OpenID Protocol with IDP deny, with pre-auth ...", - end=' ') - try: + with TC.case('Run OpenID Protocol with IdP deny, with pre-auth'): sess2.auth_to_idp(idpname) page = sess2.fetch_page(idpname, 'https://127.0.0.11:45081/?extensions=NO') page.expected_value('text()', 'ERROR: Cancelled') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") sess3 = HttpSessions() sess3.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon') sess3.add_server(sp1name, 'https://127.0.0.11:45081') - print("openid: Run OpenID Protocol with IDP deny, without pre-auth ...", - end=' ') - try: + with TC.case('Run OpenID Protocol with IdP deny, without pre-auth'): page = sess3.fetch_page(idpname, 'https://127.0.0.11:45081/?extensions=NO') page.expected_value('text()', 'ERROR: Cancelled') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") diff --git a/tests/openidc.py b/tests/openidc.py index 3347396..1225046 100755 --- a/tests/openidc.py +++ b/tests/openidc.py @@ -1,15 +1,13 @@ #!/usr/bin/python # -# Copyright (C) 2016 Ipsilon project Contributors, for license see COPYING - -from __future__ import print_function +# Copyright (C) 2016-2017 Ipsilon project Contributors, for license see COPYING from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import os import json import pwd -import sys import requests import hashlib from string import Template @@ -159,17 +157,17 @@ class IpsilonTest(IpsilonTestBase): super(IpsilonTest, self).__init__('openidc', __file__) def setup_servers(self, env=None): - print("Installing IDP server") + self.setup_step("Installing IDP server") name = 'idp1' addr = '127.0.0.10' port = '45080' idp = self.generate_profile(idp_g, idp_a, name, addr, port) conf = self.setup_idp_server(idp, name, addr, port, env) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") self.start_http_server(conf, env) - print("Installing first SP server") + self.setup_step("Installing first SP server") name = 'sp1' addr = '127.0.0.11' port = '45081' @@ -177,10 +175,10 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting first SP's httpd server") + self.setup_step("Starting first SP's httpd server") self.start_http_server(conf, env) - print("Installing second SP server") + self.setup_step("Installing second SP server") name = 'sp2' addr = '127.0.0.12' port = '45082' @@ -188,10 +186,10 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting second SP's httpd server") + self.setup_step("Starting second SP's httpd server") self.start_http_server(conf, env) - print("Installing third SP server") + self.setup_step("Installing third SP server") name = 'sp3' addr = '127.0.0.13' port = '45083' @@ -199,7 +197,7 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting third SP's httpd server") + self.setup_step("Starting third SP's httpd server") self.start_http_server(conf, env) @@ -217,16 +215,10 @@ if __name__ == '__main__': sess.add_server(sp2name, 'https://127.0.0.12:45082') sess.add_server(sp3name, 'https://127.0.0.13:45083') - print("openidc: Authenticate to IDP ...", end=' ') - try: + with TC.case('Authenticate to IdP'): sess.auth_to_idp(idpname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("openidc: Registering test client ...", end=' ') - try: + with TC.case('Registering test client'): client_info = { 'redirect_uris': ['https://invalid/'], 'response_types': ['code'], @@ -240,13 +232,8 @@ if __name__ == '__main__': json=client_info) r.raise_for_status() reg_resp = r.json() - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("openidc: Registering test client with none auth ...", end=' ') - try: + with TC.case('Registering test client with none auth'): client_info = { 'redirect_uris': ['https://invalid/'], 'response_types': ['code'], @@ -260,13 +247,8 @@ if __name__ == '__main__': json=client_info) r.raise_for_status() reg_resp_none = r.json() - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("openidc: Access first SP Protected Area ...", end=' ') - try: + with TC.case('Access first SP protected area'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/', require_consent=True) h = hashlib.sha256() @@ -280,34 +262,17 @@ if __name__ == '__main__': 'acr': '0' } old_token = check_info_results(page.text, expect) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - - print("openidc: Log back in to first SP Protected Area without consent" - " ...", end=' ') - try: + + with TC.case('Log back in to first SP Protected Area without consent'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/redirect_uri?log' 'out=https%3A%2F%2F127.0.0.11%3A45081%2Fsp%2F', require_consent=False) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("openidc: Revoking SP consent ...", end=' ') - try: + with TC.case('Revoking SP consent'): page = sess.revoke_all_consent(idpname) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - - print("openidc: Log back in to first SP Protected Area with consent ...", - end=' ') - try: + + with TC.case('Log back in to the first SP Protected Area with consent'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/redirect_uri?log' 'out=https%3A%2F%2F127.0.0.11%3A45081%2Fsp%2F', @@ -323,24 +288,14 @@ if __name__ == '__main__': 'acr': '0' } new_token = check_info_results(page.text, expect) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("openidc: Update first SP client name ...", end=' ') - try: + with TC.case('Update first SP client name'): sess.update_options( idpname, 'providers/openidc/admin/client/%s' % reg_resp['client_id'], {'Client Name': 'Test suite client updated'}) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("openidc: Retrieving token info ...", end=' ') - try: + with TC.case('Retrieving toke info'): # Testing token without client auth r = requests.post('https://127.0.0.10:45080/idp1/openidc/TokenInfo', data={'token': new_token['access_token']}) @@ -416,13 +371,8 @@ if __name__ == '__main__': 'client_secret': reg_resp['client_secret']}) if r.status_code != 400: raise Exception('Deleted client accepted') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("openidc: Using none-authenticated client ...", end=' ') - try: + with TC.case('Using none-authenticated client'): # Test that none-authed clients don't have access to token info r = requests.post( 'https://127.0.0.10:45080/idp1/openidc/TokenInfo', @@ -478,13 +428,8 @@ if __name__ == '__main__': 'client_secret': reg_resp_none['client_secret']}) if r.status_code != 200: raise Exception('Authed client not accepted') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("openidc: Checking user info ...", end=' ') - try: + with TC.case('Checking user info'): # Testing user info without token r = requests.post('https://127.0.0.10:45080/idp1/openidc/UserInfo') if r.status_code != 403: @@ -503,13 +448,8 @@ if __name__ == '__main__': h.update('testcase') if info['sub'] != h.hexdigest(): raise Exception('Sub claim invalid') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("openidc: Access second SP Protected Area ...", end=' ') - try: + with TC.case('Access second SP Protected Area'): page = sess.fetch_page(idpname, 'https://127.0.0.12:45082/sp/') expect = { 'sub': user, @@ -518,13 +458,8 @@ if __name__ == '__main__': 'acr': '0' } check_info_results(page.text, expect) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("openidc: Access third SP Protected Area ...", end=' ') - try: + with TC.case('Access third SP Protected Area'): page = sess.fetch_page(idpname, 'https://127.0.0.13:45083/sp/') h = hashlib.sha256() h.update('127.0.0.13') @@ -537,47 +472,28 @@ if __name__ == '__main__': 'acr': '0' } check_info_results(page.text, expect) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("openidc: Set IDP authz stack to deny", end=' ') - try: + with TC.case('Set IdP authz stack to deny'): sess.disable_plugin(idpname, 'authz', 'allow') sess.enable_plugin(idpname, 'authz', 'deny') - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") sess2 = HttpSessions() sess2.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon') sess2.add_server(sp1name, 'https://127.0.0.11:45081') - print("openidc: Access first SP Protected Area with IDP deny, with " - "pre-auth ...", end=' ') - try: + with TC.case('Access first SP Protected Area with IdP deny, with ' + 'pre-auth'): sess2.auth_to_idp(idpname) page = sess2.fetch_page(idpname, 'https://127.0.0.11:45081/sp/') check_text_results(page.text, 'OpenID Connect Provider error: access_denied') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") sess3 = HttpSessions() sess3.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon') sess3.add_server(sp1name, 'https://127.0.0.11:45081') - print("openidc: Access first SP Protected Area with IDP deny, without " - "pre-auth ...", end=' ') - try: + with TC.case('Access first SP Protected Area with IdP deny, without ' + 'pre-auth'): page = sess3.fetch_page(idpname, 'https://127.0.0.11:45081/sp/') check_text_results(page.text, 'OpenID Connect Provider error: access_denied') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") diff --git a/tests/pgdb.py b/tests/pgdb.py index 2d34755..11427c4 100755 --- a/tests/pgdb.py +++ b/tests/pgdb.py @@ -1,14 +1,12 @@ #!/usr/bin/python # -# Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING - -from __future__ import print_function +# Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import os import pwd -import sys from string import Template @@ -94,7 +92,7 @@ class IpsilonTest(IpsilonTestBase): def setup_servers(self, env=None): - print("Installing IDP's database server") + self.setup_step("Installing IDP's database server") datadir = os.path.join(self.testdir, 'pgdata') rundir = self.testdir log = os.path.join(self.testdir, 'log/pgdb.log') @@ -102,20 +100,20 @@ class IpsilonTest(IpsilonTestBase): port = '45432' self.setup_pgdb(datadir, env) - print("Starting IDP's database server") + self.setup_step("Starting IDP's database server") self.start_pgdb_server(datadir, rundir, log, addr, port, env) - print("Installing IDP server") + self.setup_step("Installing IDP server") name = 'idp1' addr = '127.0.0.10' port = '45080' idp = self.generate_profile(idp_g, idp_a, name, addr, port) conf = self.setup_idp_server(idp, name, addr, port, env) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") self.start_http_server(conf, env) - print("Installing SP server") + self.setup_step("Installing SP server") name = 'sp1' addr = '127.0.0.11' port = '45081' @@ -123,7 +121,7 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting SP's httpd server") + self.setup_step("Starting SP's httpd server") self.start_http_server(conf, env) @@ -137,44 +135,22 @@ if __name__ == '__main__': sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon') sess.add_server(spname, 'https://127.0.0.11:45081') - print("pgdb: Authenticate to IDP ...", end=' ') - sys.stdout.flush() - try: - print('Stress-testing the database connections...', end=' ') - sys.stdout.flush() + with TC.case('Authenticate to IdP'): + # Stress-test database connections for i in xrange(50): sess.auth_to_idp(idpname) sess.logout_from_idp(idpname) sess.auth_to_idp(idpname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("pgdb: Add SP Metadata to IDP ...", end=' ') - try: + with TC.case('Add SP Metadata to IdP'): sess.add_sp_metadata(idpname, spname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("pgdb: Access SP Protected Area ...", end=' ') - try: + with TC.case('Access SP Protected Area'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/') page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("pgdb: Logout from SP ...", end=' ') - try: + with TC.case('Logout from SP'): page = sess.fetch_page(idpname, '%s/%s?%s' % ( 'https://127.0.0.11:45081', 'saml2/logout', 'ReturnTo=https://127.0.0.11:45081/open/logged_out.html')) page.expected_value('text()', 'Logged out') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") diff --git a/tests/test1.py b/tests/test1.py index f278101..135406b 100755 --- a/tests/test1.py +++ b/tests/test1.py @@ -1,14 +1,12 @@ #!/usr/bin/python # -# Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING - -from __future__ import print_function +# Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import os import pwd -import sys from string import Template idp_g = {'TEMPLATES': '${TESTDIR}/templates/install', @@ -105,17 +103,17 @@ class IpsilonTest(IpsilonTestBase): super(IpsilonTest, self).__init__('test1', __file__) def setup_servers(self, env=None): - print("Installing IDP server") + self.setup_step("Installing IDP server") name = 'idp1' addr = '127.0.0.10' port = '45080' idp = self.generate_profile(idp_g, idp_a, name, addr, port) conf = self.setup_idp_server(idp, name, addr, port, env) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") self.start_http_server(conf, env) - print("Installing first SP server") + self.setup_step("Installing first SP server") name = 'sp1' addr = '127.0.0.11' port = '45081' @@ -123,10 +121,10 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting first SP's httpd server") + self.setup_step("Starting first SP's httpd server") self.start_http_server(conf, env) - print("Installing second SP server") + self.setup_step("Installing second SP server") name = 'sp2-test.example.com' addr = '127.0.0.11' port = '45082' @@ -137,7 +135,7 @@ class IpsilonTest(IpsilonTestBase): os.remove(os.path.dirname(sp) + '/pw.txt') fixup_sp_httpd(os.path.dirname(conf)) - print("Starting second SP's httpd server") + self.setup_step("Starting second SP's httpd server") self.start_http_server(conf, env) @@ -153,72 +151,36 @@ if __name__ == '__main__': sess.add_server(sp1name, 'https://127.0.0.11:45081') sess.add_server(sp2name, 'https://127.0.0.11:45082') - print("test1: Authenticate to IDP ...", end=' ') - try: + with TC.case('Authenticate to IDP'): sess.auth_to_idp(idpname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("test1: Add first SP Metadata to IDP ...", end=' ') - try: + with TC.case('Add first SP metadata to IDP'): sess.add_sp_metadata(idpname, sp1name) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("test1: Access first SP Protected Area ...", end=' ') - try: + with TC.case('Access first AP protected area'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/') page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("test1: Access second SP Protected Area ...", end=' ') - try: + with TC.case('Access second SP protected area'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45082/sp/') page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("test1: Update second SP ...", end=' ') - try: + with TC.case('Update second SP'): # This is a test to see whether we can update SAML SPs where the name # is an FQDN (includes hyphens and dots). See bug #196 sess.set_attributes_and_mapping(idpname, [], ['namefull', 'givenname', 'surname'], spname=sp2name) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - print("test1: Try authentication failure ...", end=' ') newsess = HttpSessions() newsess.add_server(idpname, 'https://127.0.0.10:45080', user, 'wrong') - try: + with TC.case('Try authentication failure', should_fail=True): newsess.auth_to_idp(idpname) - print(" ERROR: Authentication should have failed", file=sys.stderr) - sys.exit(1) - except Exception as e: # pylint: disable=broad-except - print(" SUCCESS") - print("test1: Add keyless SP Metadata to IDP ...", end=' ') - try: + with TC.case('Add keyless SP metadata to IDP'): sess.add_metadata(idpname, 'keyless', keyless_metadata) page = sess.fetch_page(idpname, 'https://127.0.0.10:45080/idp1/admin/' 'providers/saml2/admin') page.expected_value('//div[@id="row_provider_http://keyless-sp"]/' '@title', 'WARNING: SP does not have signing keys!') - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") diff --git a/tests/testcleanup.py b/tests/testcleanup.py index bb55737..8fe27e6 100755 --- a/tests/testcleanup.py +++ b/tests/testcleanup.py @@ -1,14 +1,12 @@ #!/usr/bin/python # -# Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING - -from __future__ import print_function +# Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import os import pwd -import sys import sqlite3 from string import Template import time @@ -81,17 +79,17 @@ class IpsilonTest(IpsilonTestBase): super(IpsilonTest, self).__init__('testcleanup', __file__) def setup_servers(self, env=None): - print("Installing IDP server") + self.setup_step("Installing IDP server") name = 'idp1' addr = '127.0.0.10' port = '45080' idp = self.generate_profile(idp_g, idp_a, name, addr, port) conf = self.setup_idp_server(idp, name, addr, port, env) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") self.start_http_server(conf, env) - print("Installing SP server") + self.setup_step("Installing SP server") name = 'sp1' addr = '127.0.0.11' port = '45081' @@ -99,7 +97,7 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting first SP's httpd server") + self.setup_step("Starting first SP's httpd server") self.start_http_server(conf, env) @@ -113,52 +111,25 @@ if __name__ == '__main__': sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon') sess.add_server(sp1name, 'https://127.0.0.11:45081') - print("testcleanup: Verify logged out state ...", end=' ') - try: + with TC.case('Verify logged out state'): page = sess.fetch_page(idpname, 'https://127.0.0.10:45080/idp1/') page.expected_value('//div[@id="content"]/p/a/text()', 'Log In') - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testcleanup: Authenticate to IDP ...", end=' ') - try: + with TC.case('Authenticating to IdP'): sess.auth_to_idp(idpname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testcleanup: Add SP Metadata to IDP ...", end=' ') - try: + with TC.case('Add SP Metadata to IdP'): sess.add_sp_metadata(idpname, sp1name) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testcleanup: Access first SP Protected Area ...", end=' ') - try: + with TC.case('Access first SP Protected Area'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/') page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testcleanup: Verify logged in state ...", end=' ') - try: + with TC.case('Verify logged in state'): page = sess.fetch_page(idpname, 'https://127.0.0.10:45080/idp1/') page.expected_value('//div[@id="content"]/p/a/text()', None) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - - print("testcleanup: Checking that SAML2 sessions were created ...", - end=' ') - try: + + with TC.case('Checking that SAML2 sessions were created'): sess_db = os.path.join(os.environ['TESTDIR'], 'lib/idp1/saml2.sessions.db.sqlite') conn = sqlite3.connect(sess_db) @@ -167,29 +138,17 @@ if __name__ == '__main__': if len(cur.fetchall()) == 0: raise ValueError('SAML2 sessions not created') conn.close() - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") # Sessions are valid for six seconds, and we clean up once per minute. # However, checking after a minute is kinda cutting it close, so we add ten # seconds to make sure the system has had time to clean up. - print("Waiting a minute for cleanup to happen ...") time.sleep(70) - print("testcleanup: Verify logged out state ...", end=' ') - try: + with TC.case('Verify logged out state'): page = sess.fetch_page(idpname, 'https://127.0.0.10:45080/idp1/') page.expected_value('//div[@id="content"]/p/a/text()', 'Log In') - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - - print("testcleanup: Checking that SAML2 sessions were destroyed ...", - end=' ') - try: + + with TC.case('Checking that SAML2 sessions were destroyed'): sess_db = os.path.join(os.environ['TESTDIR'], 'lib/idp1/saml2.sessions.db.sqlite') conn = sqlite3.connect(sess_db) @@ -197,7 +156,3 @@ if __name__ == '__main__': cur.execute('SELECT * FROM saml2_sessions;') if len(cur.fetchall()) != 0: raise ValueError('SAML2 sessions left behind: %s' % cur.fetchall()) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") diff --git a/tests/testetcd.py b/tests/testetcd.py index e858569..00e0688 100755 --- a/tests/testetcd.py +++ b/tests/testetcd.py @@ -2,13 +2,11 @@ # # Copyright (C) 2017 Ipsilon project Contributors, for license see COPYING -from __future__ import print_function - from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import os import pwd -import sys import subprocess from string import Template @@ -118,25 +116,21 @@ class IpsilonTest(IpsilonTestBase): stdout=subprocess.PIPE) stdout, _ = p.communicate() except OSError: - print("No etcd installed") - return False + return 'No etcd installed' if not p.wait() == 0: - print('No etcd installed') - return False + return 'No etcd installed' # Example line: etcd Version: 3.0.13 if int(stdout.split('\n')[0].split(': ')[1][0]) < 3: - print('Etcd version < 3.0') - return False + return 'Etcd version < 3.0' try: import etcd # pylint: disable=unused-variable,import-error except ImportError: - print('No python-etcd available') - return False - return True + return 'No python-etcd available' + return None def setup_servers(self, env=None): - print("Starting IDP's etcd server") + self.setup_step("Starting IDP's etcd server") datadir = os.path.join(self.testdir, 'etcd') os.mkdir(datadir) addr = '127.0.0.10' @@ -144,17 +138,17 @@ class IpsilonTest(IpsilonTestBase): srvport = '42380' self.start_etcd_server(datadir, addr, clientport, srvport, env) - print("Installing IDP server") + self.setup_step("Installing IDP server") name = 'idp1' addr = '127.0.0.10' port = '45080' idp = self.generate_profile(idp_g, idp_a, name, addr, port) conf = self.setup_idp_server(idp, name, addr, port, env) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") self.start_http_server(conf, env) - print("Installing first SP server") + self.setup_step("Installing first SP server") name = 'sp1' addr = '127.0.0.11' port = '45081' @@ -162,10 +156,10 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting first SP's httpd server") + self.setup_step("Starting first SP's httpd server") self.start_http_server(conf, env) - print("Installing second SP server") + self.setup_step("Installing second SP server") name = 'sp2-test.example.com' addr = '127.0.0.11' port = '45082' @@ -176,7 +170,7 @@ class IpsilonTest(IpsilonTestBase): os.remove(os.path.dirname(sp) + '/pw.txt') fixup_sp_httpd(os.path.dirname(conf)) - print("Starting second SP's httpd server") + self.setup_step("Starting second SP's httpd server") self.start_http_server(conf, env) @@ -192,72 +186,41 @@ if __name__ == '__main__': sess.add_server(sp1name, 'https://127.0.0.11:45081') sess.add_server(sp2name, 'https://127.0.0.11:45082') - print("etcd: Authenticate to IDP ...", end=' ') - try: + with TC.case('Authenticate to IdP'): sess.auth_to_idp(idpname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("etcd: Add first SP Metadata to IDP ...", end=' ') - try: + with TC.case('Add first SP Metadata to IdP'): sess.add_sp_metadata(idpname, sp1name) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("etcd: Access first SP Protected Area ...", end=' ') - try: + with TC.case('Access first SP Protected Area'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/') page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("etcd: Access second SP Protected Area ...", end=' ') - try: + with TC.case('Access second SP Protected Area'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45082/sp/') page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("etcd: Update second SP ...", end=' ') - try: + with TC.case('Update Second SP'): # This is a test to see whether we can update SAML SPs where the name # is an FQDN (includes hyphens and dots). See bug #196 sess.set_attributes_and_mapping(idpname, [], ['namefull', 'givenname', 'surname'], spname=sp2name) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - print("etcd: Try authentication failure ...", end=' ') newsess = HttpSessions() newsess.add_server(idpname, 'https://127.0.0.10:45080', user, 'wrong') - try: - newsess.auth_to_idp(idpname) - print(" ERROR: Authentication should have failed", file=sys.stderr) - sys.exit(1) - except Exception as e: # pylint: disable=broad-except - print(" SUCCESS") - - print("etcd: Add keyless SP Metadata to IDP ...", end=' ') - try: + with TC.case('Try authentication failure'): + try: + newsess.auth_to_idp(idpname) + except Exception as e: # pylint: disable=broad-except + pass + else: + raise ValueError('Authentication should have failed') + + with TC.case('Add keyless SP Metadata to IdP'): sess.add_metadata(idpname, 'keyless', keyless_metadata) page = sess.fetch_page(idpname, 'https://127.0.0.10:45080/idp1/admin/' 'providers/saml2/admin') page.expected_value('//div[@id="row_provider_http://keyless-sp"]/' '@title', 'WARNING: SP does not have signing keys!') - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") diff --git a/tests/testgssapi.py b/tests/testgssapi.py index d93d86d..65eeb6a 100755 --- a/tests/testgssapi.py +++ b/tests/testgssapi.py @@ -1,15 +1,13 @@ #!/usr/bin/python # -# Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING - -from __future__ import print_function +# Copyright (C) 2015-2017 Ipsilon project Contributors, for license see COPYING from helpers.common import IpsilonTestBase # pylint: disable=relative-import from helpers.common import WRAP_HOSTNAME # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import os import pwd -import sys from string import Template idp_g = {'TEMPLATES': '${TESTDIR}/templates/install', @@ -96,16 +94,16 @@ class IpsilonTest(IpsilonTestBase): def setup_servers(self, env=None): os.mkdir("%s/ccaches" % self.testdir) - print("Installing KDC server") + self.setup_step("Installing KDC server") kdcenv = self.setup_kdc(env) - print("Creating principals and keytabs") + self.setup_step("Creating principals and keytabs") self.setup_keys(kdcenv) - print("Getting a TGT") + self.setup_step("Getting a TGT") self.kinit_keytab(kdcenv) - print("Installing IDP server") + self.setup_step("Installing IDP server") name = 'idp1' addr = 'idp.ipsilon.dev' port = '45080' @@ -113,10 +111,10 @@ class IpsilonTest(IpsilonTestBase): idp = self.generate_profile(idp_g, idp_a, name, addr, port) conf = self.setup_idp_server(idp, name, addr, port, env) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") self.start_http_server(conf, env) - print("Installing first SP server") + self.setup_step("Installing first SP server") name = 'sp1' addr = '127.0.0.11' port = '45081' @@ -124,10 +122,10 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting first SP's httpd server") + self.setup_step("Starting first SP's httpd server") self.start_http_server(conf, env) - print("Installing second SP server") + self.setup_step("Installing second SP server") name = 'sp2' addr = '127.0.0.11' port = '45082' @@ -138,7 +136,7 @@ class IpsilonTest(IpsilonTestBase): os.remove(os.path.dirname(sp) + '/pw.txt') fixup_sp_httpd(os.path.dirname(conf)) - print("Starting second SP's httpd server") + self.setup_step("Starting second SP's httpd server") self.start_http_server(conf, env) @@ -165,36 +163,16 @@ if __name__ == '__main__': sess.add_server(sp1name, 'https://127.0.0.11:45081') sess.add_server(sp2name, 'https://127.0.0.11:45082') - print("testgssapi: Authenticate to IDP ...", end=' ') - try: + with TC.case('Authenticate to IdP'): sess.auth_to_idp(idpname, krb=True) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testgssapi: Add first SP Metadata to IDP ...", end=' ') - try: + with TC.case('Add first SP Metadata to IdP'): sess.add_sp_metadata(idpname, sp1name) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testgssapi: Access first SP Protected Area ...", end=' ') - try: + with TC.case('Access first SP Protected Area'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/') page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testgssapi: Access second SP Protected Area ...", end=' ') - try: + with TC.case('Access second SP Protected Area'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45082/sp/') page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") diff --git a/tests/testlogout.py b/tests/testlogout.py index 5a28c7a..499bb53 100755 --- a/tests/testlogout.py +++ b/tests/testlogout.py @@ -1,14 +1,12 @@ #!/usr/bin/python # -# Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING - -from __future__ import print_function +# Copyright (C) 2015-2017 Ipsilon project Contributors, for license see COPYING from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import os import pwd -import sys from string import Template @@ -143,21 +141,21 @@ class IpsilonTest(IpsilonTestBase): super(IpsilonTest, self).__init__('testlogout', __file__) def setup_servers(self, env=None): - print("Installing IDP server") + self.setup_step("Installing IDP server") name = 'idp1' addr = '127.0.0.10' port = '45080' idp = self.generate_profile(idp_g, idp_a, name, addr, port) conf = self.setup_idp_server(idp, name, addr, port, env) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") self.start_http_server(conf, env) for spdata in splist: nameid = spdata['nameid'] addr = spdata['addr'] port = spdata['port'] - print("Installing SP server %s" % nameid) + self.setup_step("Installing SP server %s" % nameid) # Configure sp3 and sp4 for only HTTP Redirect to test # that a mix of SOAP and HTTP Redirect will play nice @@ -173,7 +171,7 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp_prof, nameid, addr, str(port), env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting SP's httpd server") + self.setup_step("Starting SP's httpd server") self.start_http_server(conf, env) @@ -189,188 +187,96 @@ if __name__ == '__main__': spurl = 'https://%s:%s' % (sp['addr'], sp['port']) sess.add_server(spname, spurl) - print("testlogout: Authenticate to IDP ...", end=' ') - try: + with TC.case('Authenticate to IdP'): sess.auth_to_idp(idpname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") for sp in splist: spname = sp['nameid'] - print("testlogout: Add SP Metadata for %s to IDP ..." % spname, - end=' ') - try: + with TC.case('Add SP Metadata for %s to IdP' % spname): sess.add_sp_metadata(idpname, spname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testlogout: Logout without logging into SP ...", end=' ') - try: + with TC.case('Logout without logging into SP'): page = sess.fetch_page(idpname, '%s/%s?%s' % ( 'https://127.0.0.11:45081', 'saml2/logout', 'ReturnTo=https://127.0.0.11:45081/open/logged_out.html')) page.expected_value('text()', 'Logged out') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testlogout: Access SP Protected Area ...", end=' ') - try: + with TC.case('Access SP Protected Area'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/') page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testlogout: Logout from SP ...", end=' ') - try: + with TC.case('Logout from SP'): page = sess.fetch_page(idpname, '%s/%s?%s' % ( 'https://127.0.0.11:45081', 'saml2/logout', 'ReturnTo=https://127.0.0.11:45081/open/logged_out.html')) page.expected_value('text()', 'Logged out') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testlogout: Try logout again ...", end=' ') - try: + with TC.case('Try logout again'): page = sess.fetch_page(idpname, '%s/%s?%s' % ( 'https://127.0.0.11:45081', 'saml2/logout', 'ReturnTo=https://127.0.0.11:45081/open/logged_out.html')) page.expected_value('text()', 'Logged out') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testlogout: Ensure logout ...", end=' ') - try: + with TC.case('Ensure logout'): ensure_logout(sess, idpname, 'https://127.0.0.11:45081/sp/') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") # Test logout from each of the SP's in the list to ensure that the # order of logout doesn't matter. for sporder in splist: - print("testlogout: Access SP Protected Area of each SP ...", end=' ') - for sp in splist: - spname = sp['nameid'] - spurl = 'https://%s:%s/sp/' % (sp['addr'], sp['port']) - try: + with TC.case('Access SP PRotected Area of all SPs'): + for sp in splist: + spname = sp['nameid'] + spurl = 'https://%s:%s/sp/' % (sp['addr'], sp['port']) page = sess.fetch_page(idpname, spurl) page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - - print("testlogout: Initiate logout from %s ..." % sporder['nameid'], - end=' ') - try: + + with TC.case('Initiate logout from %s' % sporder['nameid']): logouturl = 'https://%s:%s' % (sp['addr'], sp['port']) page = sess.fetch_page(idpname, '%s/%s?%s' % ( logouturl, 'saml2/logout', 'ReturnTo=https://127.0.0.11:45081/open/logged_out.html')) page.expected_value('text()', 'Logged out') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - - print("testlogout: Ensure logout of each SP ...", end=' ') - for sp in splist: - spname = sp['nameid'] - spurl = 'https://%s:%s/sp/' % (sp['addr'], sp['port']) - try: + + with TC.case('Ensure logout of each SP'): + for sp in splist: + spname = sp['nameid'] + spurl = 'https://%s:%s/sp/' % (sp['addr'], sp['port']) ensure_logout(sess, idpname, spurl) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") # Test IdP-initiated logout - print("testlogout: Access SP Protected Area of SP1...", end=' ') - try: + with TC.case('Access SP Protected area of SP1'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/') page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testlogout: Access SP Protected Area of SP2...", end=' ') - try: + with TC.case('Access SP Protected Area of SP2'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45082/sp/') page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testlogout: Access the IdP...", end=' ') - try: + with TC.case('Access the IdP'): page = sess.fetch_page(idpname, 'https://127.0.0.10:45080/%s' % idpname) page.expected_value('//div[@id="welcome"]/p/text()', 'Welcome %s!' % user) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testlogout: IdP-initiated logout ...", end=' ') - try: + with TC.case('IdP-initiated logout'): page = sess.fetch_page(idpname, 'https://127.0.0.10:45080/%s/logout' % idpname) page.expected_value('//div[@id="content"]/p/a/text()', 'Log In') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testlogout: Ensure logout of SP1 ...", end=' ') - try: + with TC.case('Ensure logout of SP1'): ensure_logout(sess, idpname, 'https://127.0.0.11:45081/sp/') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testlogout: Ensure logout of SP2 ...", end=' ') - try: + with TC.case('Ensure logout of SP2'): ensure_logout(sess, idpname, 'https://127.0.0.11:45082/sp/') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testlogout: Access the IdP...", end=' ') - try: + with TC.case('Access the IdP'): page = sess.fetch_page(idpname, 'https://127.0.0.10:45080/%s/login' % idpname) page.expected_value('//div[@id="welcome"]/p/text()', 'Welcome %s!' % user) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testlogout: IdP-initiated logout with no SP sessions...", end=' ') - try: + with TC.case('IdP-initiated logout with no SP sessions'): page = sess.fetch_page(idpname, 'https://127.0.0.10:45080/%s/logout' % idpname) page.expected_value('//div[@id="logout"]/p//text()', 'Successfully logged out.') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") diff --git a/tests/testmapping.py b/tests/testmapping.py index b2f0c3c..c415c81 100755 --- a/tests/testmapping.py +++ b/tests/testmapping.py @@ -1,14 +1,12 @@ #!/usr/bin/python # -*- coding: utf-8 -*- # -# Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING - -from __future__ import print_function +# Copyright (C) 2015-2017 Ipsilon project Contributors, for license see COPYING from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import os -import sys import pwd from string import Template @@ -145,14 +143,14 @@ class IpsilonTest(IpsilonTestBase): super(IpsilonTest, self).__init__('testmapping', __file__) def setup_servers(self, env=None): - print("Installing IDP server") + self.setup_step("Installing IDP server") name = 'idp1' addr = '127.0.0.10' port = '45080' idp = self.generate_profile(idp_g, idp_a, name, addr, port) conf = self.setup_idp_server(idp, name, addr, port, env) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") self.start_http_server(conf, env) for spdata in sp_list: @@ -160,12 +158,12 @@ class IpsilonTest(IpsilonTestBase): port = spdata['port'] name = spdata['name'] - print("Installing SP server %s" % name) + self.setup_step("Installing SP server %s" % name) sp_prof = self.generate_profile(sp_g, sp_a, name, addr, str(port)) conf = self.setup_sp_server(sp_prof, name, addr, str(port), env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting SP's httpd server") + self.setup_step("Starting SP's httpd server") self.start_http_server(conf, env) @@ -182,24 +180,13 @@ if __name__ == '__main__': sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon') sess.add_server(sp['name'], spurl) - print("testmapping: Authenticate to IDP ...", end=' ') - try: + with TC.case('Authenticate to IdP'): sess.auth_to_idp(idpname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testmapping: Add SP Metadata to IDP ...", end=' ') - try: + with TC.case('Add SP Metadata to IdP'): sess.add_sp_metadata(idpname, sp['name']) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - try: - print("testmapping: Test default mapping and attrs ...", end=' ') + with TC.case('Test default mapping and attrs'): expect = { 'NAME_ID': user, 'fullname': 'Test User %s' % user, @@ -209,25 +196,14 @@ if __name__ == '__main__': 'groups': user, } check_info_plugin(sess, idpname, spurl, expect) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testmapping: Set default global mapping ...", end=' ') - try: + with TC.case('Set default global mapping'): sess.set_attributes_and_mapping( idpname, [['*', '*'], ['fullname', 'namefull']]) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - try: - print("testmapping: Test global mapping ...", end=' ') + + with TC.case('Test global mapping'): expect = { 'fullname': 'Test User %s' % user, 'namefull': 'Test User %s' % user, @@ -237,52 +213,28 @@ if __name__ == '__main__': 'groups': user } check_info_plugin(sess, idpname, spurl, expect) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testmapping: Set default allowed attributes ...", end=' ') - try: + + with TC.case('Set default allowed attributes'): sess.set_attributes_and_mapping( idpname, [], ['namefull', 'givenname', 'surname']) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - try: - print("testmapping: Test global allowed attributes ...", end=' ') + + with TC.case('Test global allowed attributes'): expect = { 'namefull': 'Test User %s' % user, 'surname': user, 'givenname': u'Test User 一', } check_info_plugin(sess, idpname, spurl, expect) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testmapping: Set SP allowed attributes ...", end=' ') - try: + + with TC.case('Set SP allowed attributes'): sess.set_attributes_and_mapping( idpname, [['*', '*']], ['wholename', 'givenname', 'surname', 'email', 'fullname'], sp['name']) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testmapping: Test SP allowed atributes ...", end=' ') - try: + + with TC.case('Test SP allowed attributes'): expect = { 'fullname': 'Test User %s' % user, 'surname': user, @@ -290,14 +242,8 @@ if __name__ == '__main__': 'email': '%s@example.com' % user, } check_info_plugin(sess, idpname, spurl, expect) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testmapping: Set SP attribute mapping ...", end=' ') - try: + + with TC.case('Set SP attribute mapping'): sess.set_attributes_and_mapping( idpname, [['*', '*'], @@ -306,14 +252,8 @@ if __name__ == '__main__': 'surname', 'email', 'fullname'], sp['name']) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testmapping: Test SP attribute mapping ...", end=' ') - try: + + with TC.case('Test SP attribute mapping'): expect = { 'wholename': 'Test User %s' % user, 'fullname': 'Test User %s' % user, @@ -322,14 +262,8 @@ if __name__ == '__main__': 'email': '%s@example.com' % user, } check_info_plugin(sess, idpname, spurl, expect) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testmapping: Set SP URL attribute mapping ...", end=' ') - try: + + with TC.case('Set SP URL attribute mapping'): sess.set_attributes_and_mapping( idpname, [['*', '*'], @@ -342,14 +276,8 @@ if __name__ == '__main__': 'email', 'fullname'], sp['name']) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testmapping: Test SP URL attribute mapping ...", end=' ') - try: + + with TC.case('Test SP URL attribute mapping'): expect = { 'http://localhost/SAML/Name': 'Test User %s' % user, 'https://localhost/SAML/Name': 'Test User %s' % user, @@ -359,42 +287,24 @@ if __name__ == '__main__': 'email': '%s@example.com' % user, } check_info_plugin(sess, idpname, spurl, expect) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testmapping: Set SP explicit mapping ...", end=' ') - try: + + with TC.case('Set SP explicit mapping'): sess.set_attributes_and_mapping( idpname, [['fullname', 'wholename'], ['email', 'email']], ['wholename', 'email'], sp['name']) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testmapping: Test SP explicit mapping ...", end=' ') - try: + + with TC.case('Test SP explicit mapping'): expect = { 'wholename': 'Test User %s' % user, 'email': '%s@example.com' % user, 'NAME_ID': user, } check_info_plugin(sess, idpname, spurl, expect) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testmapping: Set SP username mapping ...", end=' ') - try: + + with TC.case('Set SP username mapping'): sess.set_attributes_and_mapping( idpname, [['*', '*'], @@ -404,14 +314,8 @@ if __name__ == '__main__': 'surname', 'email', 'fullname'], sp['name']) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testmapping: Test SP username mapping ...", end=' ') - try: + + with TC.case('Test SP username mapping'): expect = { 'wholename': 'Test User %s' % user, 'fullname': 'Test User %s' % user, @@ -421,26 +325,14 @@ if __name__ == '__main__': 'NAME_ID': '%s@example.com' % user, } check_info_plugin(sess, idpname, spurl, expect) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testmapping: Drop SP attribute mapping ...", end=' ') - try: + + with TC.case('Drop SP attribute mapping'): sess.set_attributes_and_mapping( idpname, [], ['givenname', 'surname', 'email', 'fullname'], sp['name']) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testmapping: Test SP attr mapping with default allowed...", end=' ') - try: + + with TC.case('Test SP attribute mapping with default allowed'): expect = { 'fullname': 'Test User %s' % user, 'surname': user, @@ -448,31 +340,14 @@ if __name__ == '__main__': 'email': '%s@example.com' % user, } check_info_plugin(sess, idpname, spurl, expect) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testmapping: Drop SP allowed attributes ...", end=' ') - try: + + with TC.case('Drop SP allowed attributes'): sess.set_attributes_and_mapping(idpname, [], [], sp['name']) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testmapping: Test mapping, should be back to global...", end=' ') - try: + + with TC.case('Test mapping, should be back to global'): expect = { 'namefull': 'Test User %s' % user, 'surname': user, 'givenname': u'Test User 一', } check_info_plugin(sess, idpname, spurl, expect) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") diff --git a/tests/testnameid.py b/tests/testnameid.py index e32b343..39bdac0 100755 --- a/tests/testnameid.py +++ b/tests/testnameid.py @@ -1,17 +1,15 @@ #!/usr/bin/python # -# Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING - -from __future__ import print_function +# Copyright (C) 2015-2017 Ipsilon project Contributors, for license see COPYING from helpers.common import IpsilonTestBase # pylint: disable=relative-import from helpers.common import WRAP_HOSTNAME # pylint: disable=relative-import from helpers.common import TESTREALM # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import from ipsilon.tools.saml2metadata import SAML2_NAMEID_MAP import os import pwd -import sys import re from string import Template @@ -120,23 +118,23 @@ class IpsilonTest(IpsilonTestBase): def setup_servers(self, env=None): os.mkdir("%s/ccaches" % self.testdir) - print("Installing KDC server") + self.setup_step("Installing KDC server") kdcenv = self.setup_kdc(env) - print("Creating principals and keytabs") + self.setup_step("Creating principals and keytabs") self.setup_keys(kdcenv) - print("Getting a TGT") + self.setup_step("Getting a TGT") self.kinit_keytab(kdcenv) - print("Installing IDP server") + self.setup_step("Installing IDP server") name = 'idp1' addr = WRAP_HOSTNAME port = '45080' idp = self.generate_profile(idp_g, idp_a, name, addr, port) conf = self.setup_idp_server(idp, name, addr, port, env) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") env.update(kdcenv) self.start_http_server(conf, env) @@ -144,14 +142,14 @@ class IpsilonTest(IpsilonTestBase): nameid = spdata['nameid'] addr = spdata['addr'] port = spdata['port'] - print("Installing SP server %s" % nameid) + self.setup_step("Installing SP server %s" % nameid) sp_prof = self.generate_profile( sp_g, sp_a, nameid, addr, str(port), nameid ) conf = self.setup_sp_server(sp_prof, nameid, addr, str(port), env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting SP's httpd server") + self.setup_step("Starting SP's httpd server") self.start_http_server(conf, env) @@ -161,15 +159,15 @@ if __name__ == '__main__': user = pwd.getpwuid(os.getuid())[0] expected = { - 'x509': False, # not supported - 'transient': True, - 'persistent': True, - 'windows': False, # not supported - 'encrypted': False, # not supported - 'kerberos': True, - 'email': True, - 'unspecified': True, - 'entity': False, # not supported + 'x509': True, # not supported + 'transient': False, + 'persistent': False, + 'windows': True, # not supported + 'encrypted': True, # not supported + 'kerberos': False, + 'email': False, + 'unspecified': False, + 'entity': True, # not supported } expected_re = { @@ -204,70 +202,41 @@ if __name__ == '__main__': 'ipsilon') sess.add_server(spname, spurl) - print("") - print("testnameid: Testing NameID format %s ..." % spname) + TC.info('Testing NameID format %s' % spname) if spname == 'kerberos': krb = True - print("testnameid: Authenticate to IDP ...", end=' ') - try: + with TC.case('Authenticate to IdP'): sess.auth_to_idp(idpname, krb=krb) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testnameid: Add SP Metadata to IDP ...", end=' ') - try: + with TC.case('Add SP Metadata to IdP'): sess.add_sp_metadata(idpname, spname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testnameid: Set supported Name ID formats ...", end=' ') - try: + with TC.case('Set supported Name ID formats'): sess.set_sp_default_nameids(idpname, spname, [spname]) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testnameid: Access SP Protected Area ...", end=' ') - try: + with TC.case('Access SP Protected Area', + should_fail=bool(expected[spname])): page = sess.fetch_page(idpname, '%s/sp/' % spurl) if not re.match(expected_re[spname], page.text): raise ValueError( 'page did not contain expression %s' % expected_re[spname] ) - except ValueError as e: - if expected[spname]: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" OK, EXPECTED TO FAIL") - else: - print(" SUCCESS") - - print("testnameid: Try authentication failure ...", end=' ') + newsess = HttpSessions() newsess.add_server(idpname, 'https://%s:45080' % WRAP_HOSTNAME, user, 'wrong') - try: + with TC.case('Try authentication failure', should_fail=True): newsess.auth_to_idp(idpname) - print(" ERROR: Authentication should have failed", file=sys.stderr) - sys.exit(1) - except Exception as e: # pylint: disable=broad-except - print(" SUCCESS") # Ensure that transient names change with each authentication sp = get_sp_by_nameid(sp_list, 'transient') spname = sp['nameid'] spurl = 'https://%s:%s' % (sp['addr'], sp['port']) - print("") - print("testnameid: Testing NameID format %s ..." % spname) + TC.info('Testing NameID format %s' % spname) ids = [] for i in xrange(4): @@ -275,58 +244,33 @@ if __name__ == '__main__': sess.add_server(idpname, 'https://%s:45080' % WRAP_HOSTNAME, user, 'ipsilon') sess.add_server(spname, spurl) - print("testnameid: Authenticate to IDP ...", end=' ') - try: + with TC.case('Authenticate to IdP'): sess.auth_to_idp(idpname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testnameid: Access SP ...", end=' ') - try: + + with TC.case('Acess SP'): page = sess.fetch_page(idpname, '%s/sp/' % spurl) t1 = page.text - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testnameid: Access SP again ...", end=' ') - try: + + with TC.case('Access SP again'): page = sess.fetch_page(idpname, '%s/sp/' % spurl) t2 = page.text - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testnameid: Ensure ID is consistent between requests ...", - end=' ') - if t1 != t2: - print(" ERROR: New ID between reqeusts", file=sys.stderr) - else: - print(" SUCCESS") + + with TC.case('Ensure ID is consistent between requests'): + if t1 != t2: + raise ValueError('Same ID between requests') ids.append(t1) - print("testnameid: Ensure uniqueness across sessions ...", end=' ') - if len(ids) != len(set(ids)): - print(" ERROR: IDs are not unique between sessions", file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") + with TC.case('Ensure uniqueness across sessions'): + if len(ids) != len(set(ids)): + raise ValueError('IDs are not unique between sessions') # Ensure that persistent names remain the same with each authentication sp = get_sp_by_nameid(sp_list, 'persistent') spname = sp['nameid'] spurl = 'https://%s:%s' % (sp['addr'], sp['port']) - print("") - print("testnameid: Testing NameID format %s ..." % spname) + TC.info('Testing NameID format %s' % spname) ids = [] for i in xrange(4): @@ -334,47 +278,23 @@ if __name__ == '__main__': sess.add_server(idpname, 'https://%s:45080' % WRAP_HOSTNAME, user, 'ipsilon') sess.add_server(spname, spurl) - print("testnameid: Authenticate to IDP ...", end=' ') - try: + with TC.case('Authenticate to IdP'): sess.auth_to_idp(idpname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testnameid: Access SP ...", end=' ') - try: + + with TC.case('Access SP'): page = sess.fetch_page(idpname, '%s/sp/' % spurl) t1 = page.text - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testnameid: Access SP again ...", end=' ') - try: + + with TC.case('Access SP again'): page = sess.fetch_page(idpname, '%s/sp/' % spurl) t2 = page.text - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - - print("testnameid: Ensure ID is consistent between requests ...", - end=' ') - if t1 != t2: - print(" ERROR: New ID between reqeusts", file=sys.stderr) - else: - print(" SUCCESS") + + with TC.case('Ensure ID is consistent between requests'): + if t1 != t2: + raise ValueError('New ID between requests') ids.append(t1) - print("testnameid: Ensure same ID across sessions ...", end=' ') - if len(set(ids)) != 1: - print(" ERROR: IDs are not the same between sessions", file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") + with TC.case('Ensure same ID across sessions'): + if len(set(ids)) != 1: + raise ValueError('IDs are not the same between sessions') diff --git a/tests/testrest.py b/tests/testrest.py index af7a4e0..f90f383 100755 --- a/tests/testrest.py +++ b/tests/testrest.py @@ -1,14 +1,12 @@ #!/usr/bin/python # -# Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING - -from __future__ import print_function +# Copyright (C) 2015-2017 Ipsilon project Contributors, for license see COPYING from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import os import pwd -import sys from string import Template @@ -101,17 +99,17 @@ class IpsilonTest(IpsilonTestBase): super(IpsilonTest, self).__init__('testrest', __file__) def setup_servers(self, env=None): - print("Installing IDP server") + self.setup_step("Installing IDP server") name = 'idp1' addr = '127.0.0.10' port = '45080' idp = self.generate_profile(idp_g, idp_a, name, addr, port) conf = self.setup_idp_server(idp, name, addr, port, env) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") self.start_http_server(conf, env) - print("Installing SP server") + self.setup_step("Installing SP server") name = 'sp1' addr = '127.0.0.11' port = '45081' @@ -119,10 +117,10 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf), name) - print("Starting SP's httpd server") + self.setup_step("Starting SP's httpd server") self.start_http_server(conf, env) - print("Installing second SP server") + self.setup_step("Installing second SP server") name = 'sp2-test.example.com' addr = '127.0.0.10' port = '45082' @@ -130,10 +128,10 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp2, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf), name) - print("Starting SP's httpd server") + self.setup_step("Starting SP's httpd server") self.start_http_server(conf, env) - print("Installing third SP server") + self.setup_step("Installing third SP server") name = 'sp3_invalid' addr = '127.0.0.10' port = '45083' @@ -141,7 +139,7 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp3, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf), name) - print("Starting SP's httpd server") + self.setup_step("Starting SP's httpd server") self.start_http_server(conf, env) @@ -159,36 +157,20 @@ if __name__ == '__main__': sess.add_server(sp2name, 'https://127.0.0.10:45082') sess.add_server(sp3name, 'https://127.0.0.10:45083') - print("testrest: Authenticate to IDP ...", end=' ') - try: + with TC.case('Authenticate to IdP'): sess.auth_to_idp(idpname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testrest: List initial Service Providers via REST ...", end=' ') - try: + with TC.case('List initial Service Providers via REST'): result = sess.get_rest_sp(idpname) if len(result['result']) != 0: raise ValueError( 'Expected no SP and got %d' % len(result['result']) ) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testrest: Add SP Metadata to IDP via admin ...", end=' ') - try: + with TC.case('Add SP Metadata to IdP via admin'): sess.add_sp_metadata(idpname, spname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testrest: List Service Providers via REST ...", end=' ') - try: + with TC.case('List Service Providers via REST'): result = sess.get_rest_sp(idpname) if len(result['result']) != 1: raise ValueError( @@ -199,33 +181,18 @@ if __name__ == '__main__': 'Expected %s and got %s' % (spname, result['result'][0].get('provider')) ) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testrest: Add Service Provider via REST ...", end=' ') - try: + with TC.case('Add Service Provider via REST'): sess.add_sp_metadata(idpname, sp2name, rest=True) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testrest: List Service Providers via REST ...", end=' ') - try: + with TC.case('List Service Providers via REST'): result = sess.get_rest_sp(idpname) if len(result['result']) != 2: raise ValueError( 'Expected 2 SPs and got %d' % len(result['result']) ) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testrest: List Specific Service Providers via REST ...", end=' ') - try: + with TC.case('List specific Service Providers via REST'): result = sess.get_rest_sp(idpname, spname) if len(result['result']) != 1: raise ValueError( @@ -248,61 +215,24 @@ if __name__ == '__main__': 'Expected %s and got %s' % (spname, result['result'][0].get('provider')) ) - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") # Now for some negative testing - print("testrest: Add illegally named Service Provider via REST ...", - end=' ') - try: + with TC.case('Add illegally named Service Provider via REST', + should_fail='[400]'): sess.add_sp_metadata(idpname, sp3name, rest=True) - except ValueError as e: - print(" SUCCESS") - else: - print("ERROR: " - "Adding SP with invalid name should have failed and it didn't", - file=sys.stderr) - sys.exit(1) - - print("testrest: Fetch non-existent REST endpoint ...", end=' ') - try: + + with TC.case('Fetch non-existent REST endpoint', + should_fail='(501)'): result = sess.fetch_rest_page( idpname, '/%s/rest/providers/saml2/notfound' % idpname ) - except ValueError as e: - if '(501)' not in e.message: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - else: - print("ERROR: should have returned a 404", file=sys.stderr) - sys.exit(1) - - print("testrest: Fetch non-existent SP via REST ...", end=' ') - try: + + with TC.case('Fetch non-existent SP via REST', + should_fail='(404)'): result = sess.get_rest_sp(idpname, 'foo') - except ValueError as e: - if '(404)' not in e.message: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - else: - print(" SUCCESS") - else: - print("ERROR: should have returned a 404", file=sys.stderr) - sys.exit(1) - - print("testrest: Re-add Service Provider via REST ...", end=' ') - try: + + with TC.case('Re-add Service Provider via REST', + should_fail='[400]'): sess.add_sp_metadata(idpname, sp2name, rest=True) - except ValueError as e: - print(" SUCCESS") - else: - print("ERROR: " - "Adding duplicate SP should have failed and it didn't", - file=sys.stderr) - sys.exit(1) diff --git a/tests/testroot.py b/tests/testroot.py index 211ffc0..ab46b5d 100755 --- a/tests/testroot.py +++ b/tests/testroot.py @@ -1,14 +1,12 @@ #!/usr/bin/python # -# Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING - -from __future__ import print_function +# Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import os import pwd -import sys from string import Template idp_g = {'TEMPLATES': '${TESTDIR}/templates/install', @@ -105,17 +103,17 @@ class IpsilonTest(IpsilonTestBase): super(IpsilonTest, self).__init__('testroot', __file__) def setup_servers(self, env=None): - print("Installing IDP server") + self.setup_step("Installing IDP server") name = 'root' addr = '127.0.0.10' port = '45080' idp = self.generate_profile(idp_g, idp_a, name, addr, port) conf = self.setup_idp_server(idp, name, addr, port, env) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") self.start_http_server(conf, env) - print("Installing first SP server") + self.setup_step("Installing first SP server") name = 'sp1' addr = '127.0.0.11' port = '45081' @@ -123,10 +121,10 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting first SP's httpd server") + self.setup_step("Starting first SP's httpd server") self.start_http_server(conf, env) - print("Installing second SP server") + self.setup_step("Installing second SP server") name = 'sp2-test.example.com' addr = '127.0.0.11' port = '45082' @@ -137,7 +135,7 @@ class IpsilonTest(IpsilonTestBase): os.remove(os.path.dirname(sp) + '/pw.txt') fixup_sp_httpd(os.path.dirname(conf)) - print("Starting second SP's httpd server") + self.setup_step("Starting second SP's httpd server") self.start_http_server(conf, env) @@ -153,46 +151,26 @@ if __name__ == '__main__': sess.add_server(sp1name, 'https://127.0.0.11:45081') sess.add_server(sp2name, 'https://127.0.0.11:45082') - print("testroot: Authenticate to IDP ...", end=' ') - try: + with TC.case('Authenticate to IdP'): sess.auth_to_idp(idpname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testroot: Add first SP Metadata to IDP ...", end=' ') - try: + with TC.case('Add first SP Metadata to IdP'): sess.add_sp_metadata(idpname, sp1name) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testroot: Access first SP Protected Area ...", end=' ') - try: + with TC.case('Access first SP Protected Area'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/') page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testroot: Access second SP Protected Area ...", end=' ') - try: + with TC.case('Access second SP Protected Area'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45082/sp/') page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("testroot: Try authentication failure ...", end=' ') newsess = HttpSessions() newsess.add_server(idpname, 'https://127.0.0.10:45080', user, 'wrong') - try: - newsess.auth_to_idp(idpname) - print(" ERROR: Authentication should have failed", file=sys.stderr) - sys.exit(1) - except Exception as e: # pylint: disable=broad-except - print(" SUCCESS") + with TC.case('Try authentication failure'): + try: + newsess.auth_to_idp(idpname) + except Exception as e: # pylint: disable=broad-except + pass + else: + raise ValueError('Authentication should have failed') diff --git a/tests/trans.py b/tests/trans.py index b2a153b..289f8a1 100755 --- a/tests/trans.py +++ b/tests/trans.py @@ -1,14 +1,12 @@ #!/usr/bin/python # -# Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING - -from __future__ import print_function +# Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.control import TC # pylint: disable=relative-import from helpers.http import HttpSessions # pylint: disable=relative-import import os import pwd -import sys from string import Template @@ -78,17 +76,17 @@ class IpsilonTest(IpsilonTestBase): super(IpsilonTest, self).__init__('trans', __file__) def setup_servers(self, env=None): - print("Installing IDP server") + self.setup_step("Installing IDP server") name = 'idp1' addr = '127.0.0.10' port = '45080' idp = self.generate_profile(idp_g, idp_a, name, addr, port) conf = self.setup_idp_server(idp, name, addr, port, env) - print("Starting IDP's httpd server") + self.setup_step("Starting IDP's httpd server") self.start_http_server(conf, env) - print("Installing SP server") + self.setup_step("Installing SP server") name = 'sp1' addr = '127.0.0.11' port = '45081' @@ -96,7 +94,7 @@ class IpsilonTest(IpsilonTestBase): conf = self.setup_sp_server(sp, name, addr, port, env) fixup_sp_httpd(os.path.dirname(conf)) - print("Starting SP's httpd server") + self.setup_step("Starting SP's httpd server") self.start_http_server(conf, env) @@ -106,26 +104,16 @@ if __name__ == '__main__': spname = 'sp1' user = pwd.getpwuid(os.getuid())[0] - print("trans: Add SP Metadata to IDP ...", end=' ') - try: + with TC.case('Add SP Metadata to IdP'): sess = HttpSessions() sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon') sess.add_server(spname, 'https://127.0.0.11:45081') sess.auth_to_idp(idpname) sess.add_sp_metadata(idpname, spname) - except Exception as e: # pylint: disable=broad-except - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") - print("trans: Access SP Protected Area ...", end=' ') - try: + with TC.case('Access SP Protected Area'): sess = HttpSessions() sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon') sess.add_server(spname, 'https://127.0.0.11:45081') page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/') page.expected_value('text()', 'WORKS!') - except ValueError as e: - print(" ERROR: %s" % repr(e), file=sys.stderr) - sys.exit(1) - print(" SUCCESS") From 7acda7d93226892fffb171004b4f1ff88d55f527 Mon Sep 17 00:00:00 2001 From: Patrick Uiterwijk Date: Jun 14 2017 14:14:34 +0000 Subject: [PATCH 4/8] Use automated test detection in Makefile Signed-off-by: Patrick Uiterwijk --- diff --git a/Makefile b/Makefile index 8083ff5..b37db0a 100644 --- a/Makefile +++ b/Makefile @@ -78,36 +78,11 @@ lp-test: ./tests pep8 --ignore=E121,E123,E126,E226,E24,E704,E402 tests -wrappers: - #rm -fr wrapdir - #mkdir wrapdir - #LD_PRELOAD=libsocket_wrapper.so - #SOCKET_WRAPPER_DIR=wrapdir - #SOCKET_WRAPPER_DEFAULT_IFACE=9 - TESTDIR := $(shell mktemp --directory /tmp/ipsilon-testdir.XXXXXXXX) -tests: wrappers +tests: echo "Testdir: $(TESTDIR)" - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=test1 - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=testroot - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=testlogout - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=testnameid - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=testrest - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=testmapping - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=testgssapi - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=attrs - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=trans - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=pgdb - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=testetcd - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=fconf - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=ldap - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=ldapdown - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=openid - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=openidc - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=authz - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=dbupgrades - PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=testcleanup + ./runtests --path=$(TESTDIR) -vv test: lp-test unittests tests diff --git a/runtests b/runtests new file mode 100755 index 0000000..e6e16c8 --- /dev/null +++ b/runtests @@ -0,0 +1,3 @@ +#!/bin/sh +export PYTHONPATH=. +exec python ./tests/tests.py "$@" From 37315bf842989b2444560375aaf6a8a2e8d9b37b Mon Sep 17 00:00:00 2001 From: Patrick Uiterwijk Date: Jul 18 2017 13:17:41 +0000 Subject: [PATCH 5/8] Replace mutable default arguments Signed-off-by: Patrick Uiterwijk --- diff --git a/tests/helpers/http.py b/tests/helpers/http.py index 160e775..9b2c309 100755 --- a/tests/helpers/http.py +++ b/tests/helpers/http.py @@ -454,7 +454,7 @@ class HttpSessions(object): raise ValueError('Failed to post SP data [%s]' % repr(r)) # pylint: disable=dangerous-default-value - def set_attributes_and_mapping(self, idp, mapping=[], attrs=[], + def set_attributes_and_mapping(self, idp, mapping=None, attrs=None, spname=None): """ Set allowed attributes and mapping in the IDP or the SP. In the @@ -469,6 +469,10 @@ class HttpSessions(object): attrs is the list of attributes that will be allowed: ['fullname', 'givenname', 'surname'] """ + if mapping is None: + mapping = [] + if attrs is None: + attrs = [] idpsrv = self.servers[idp] idpuri = idpsrv['baseuri'] if spname: # per-SP setting @@ -560,7 +564,7 @@ class HttpSessions(object): if r.status_code != 200: raise ValueError('Failed to disable plugin [%s]' % repr(r)) - def set_plugin_order(self, idp, plugtype, order=[]): + def set_plugin_order(self, idp, plugtype, order=None): """ Set the order of the specified login stack plugin type. @@ -568,6 +572,9 @@ class HttpSessions(object): order must be a list of zero or more plugin names in order """ + if order is None: + order = [] + idpsrv = self.servers[idp] idpuri = idpsrv['baseuri'] From a8ac66e52ca84d2e5b8a2d4c698039a42c06c7c7 Mon Sep 17 00:00:00 2001 From: Patrick Uiterwijk Date: Jul 20 2017 08:03:11 +0000 Subject: [PATCH 6/8] tests.helpers: Allow limiting redirects and posting forms Signed-off-by: Patrick Uiterwijk --- diff --git a/tests/helpers/http.py b/tests/helpers/http.py index 9b2c309..58b0b3e 100755 --- a/tests/helpers/http.py +++ b/tests/helpers/http.py @@ -269,7 +269,7 @@ class HttpSessions(object): return ['post', url, {'data': params}] def fetch_page(self, idp, target_url, follow_redirect=True, krb=False, - require_consent=None, return_prefix=None): + require_consent=None, return_prefix=None, post_forms=True): """ Fetch a page and parse the response code to determine what to do next. @@ -289,8 +289,13 @@ class HttpSessions(object): action = 'get' args = {} seen_consent = False + redirects = 0 r = None + if follow_redirect is True: + # Just make sure we get to an end at some point + follow_redirect = 50 + while True: if return_prefix and url.startswith(return_prefix): if r: @@ -299,8 +304,9 @@ class HttpSessions(object): return None r = self.access(action, url, krb=krb, **args) if r.status_code == 303 or r.status_code == 302: - if not follow_redirect: + if follow_redirect is False or redirects >= follow_redirect: return PageTree(r) + redirects += 1 url = r.headers['location'] action = 'get' args = {} @@ -311,41 +317,48 @@ class HttpSessions(object): # Fall back, hopefully to testauth authentication. try: - (action, url, args) = self.handle_login_form(idp, page) - continue + if post_forms: + (action, url, args) = self.handle_login_form(idp, page) + continue except WrongPage: pass elif r.status_code == 200: page = PageTree(r) try: - (action, url, args) = self.handle_login_form(idp, page) - continue + if post_forms: + (action, url, args) = self.handle_login_form(idp, page) + continue except WrongPage: pass try: - (action, url, args) = self.handle_return_form(page) - continue + if post_forms: + (action, url, args) = self.handle_return_form(page) + continue except WrongPage: pass try: - (action, url, args) = self.handle_openid_consent_form(page) - seen_consent = True - continue + if post_forms: + (action, url, args) = self.handle_openid_consent_form( + page) + seen_consent = True + continue except WrongPage: pass try: - (action, url, args) = self.handle_openid_form(page) - continue + if post_forms: + (action, url, args) = self.handle_openid_form(page) + continue except WrongPage: pass try: - (action, url, args) = self.handle_openidc_form(page) - continue + if post_forms: + (action, url, args) = self.handle_openidc_form(page) + continue except WrongPage: pass From a6e1f308efece5258404db1b0540f5e7573b91f8 Mon Sep 17 00:00:00 2001 From: Patrick Uiterwijk Date: Jul 20 2017 08:03:11 +0000 Subject: [PATCH 7/8] tests.test1: Add test case for no RelayState Signed-off-by: Patrick Uiterwijk --- diff --git a/tests/test1.py b/tests/test1.py index 135406b..5e03c66 100755 --- a/tests/test1.py +++ b/tests/test1.py @@ -157,6 +157,22 @@ if __name__ == '__main__': with TC.case('Add first SP metadata to IDP'): sess.add_sp_metadata(idpname, sp1name) + with TC.case('Make sure we send no RelayState if none was requested'): + page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/', + follow_redirect=1) + # Cut off the RelayState + target = page.result.headers['Location'] + target = target[:target.find('&RelayState=')] + page = sess.fetch_page(idpname, target, post_forms=False) + data = sess.get_form_data(page, 'saml-response', ['name', 'value']) + if data[0] != 'https://127.0.0.11:45081/saml2/postResponse': + TC.fail('Incorrect form found') + if 'RelayState' in data[2]: + TC.fail('RelayState found in response form') + if len(data[3]) > 1 and data[3][1] == 'None': + TC.fail('RelayState of None sent') + # Note that at this point, we have not actually submitted the response + with TC.case('Access first AP protected area'): page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/') page.expected_value('text()', 'WORKS!') From f57085e3ca929c930cd8d2f526b49a9ff12283ec Mon Sep 17 00:00:00 2001 From: Patrick Uiterwijk Date: Jul 20 2017 08:03:11 +0000 Subject: [PATCH 8/8] saml2: Do not send a RelayState with POST binding if not requested Fixes: #283 Signed-off-by: Patrick Uiterwijk --- diff --git a/ipsilon/providers/saml2/auth.py b/ipsilon/providers/saml2/auth.py index ba30060..418e17d 100644 --- a/ipsilon/providers/saml2/auth.py +++ b/ipsilon/providers/saml2/auth.py @@ -380,13 +380,14 @@ class AuthenticateRequest(ProviderPageBase): elif login.protocolProfile == lasso.LOGIN_PROTOCOL_PROFILE_BRWS_POST: login.buildAuthnResponseMsg() self.debug('POSTing back to SP [%s]' % (login.msgUrl)) + fields = [[lasso.SAML2_FIELD_RESPONSE, login.msgBody]] + if login.msgRelayState is not None: + fields.append([lasso.SAML2_FIELD_RELAYSTATE, + login.msgRelayState]) context = { "title": 'Redirecting back to the web application', "action": login.msgUrl, - "fields": [ - [lasso.SAML2_FIELD_RESPONSE, login.msgBody], - [lasso.SAML2_FIELD_RELAYSTATE, login.msgRelayState], - ], + "fields": fields, "submit": 'Return to application', } return self._template('saml2/post_response.html', **context)