#284 Fix passing relaystate "'None'" when no RelayState was in the request
Merged 6 years ago by puiterwijk. Opened 6 years ago by puiterwijk.
puiterwijk/ipsilon relaystatenonetest  into  master

file modified
+2 -27
@@ -78,36 +78,11 @@ 

  		   ./tests

  	pep8 --ignore=E121,E123,E126,E226,E24,E704,E402 tests

  

- wrappers:

- 	#rm -fr wrapdir

- 	#mkdir wrapdir

- 	#LD_PRELOAD=libsocket_wrapper.so

- 	#SOCKET_WRAPPER_DIR=wrapdir

- 	#SOCKET_WRAPPER_DEFAULT_IFACE=9

- 

  TESTDIR := $(shell mktemp --directory /tmp/ipsilon-testdir.XXXXXXXX)

  

- tests: wrappers

+ tests:

  	echo "Testdir: $(TESTDIR)"

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=test1

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=testroot

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=testlogout

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=testnameid

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=testrest

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=testmapping

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=testgssapi

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=attrs

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=trans

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=pgdb

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=testetcd

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=fconf

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=ldap

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=ldapdown

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=openid

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=openidc

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=authz

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=dbupgrades

- 	PYTHONPATH=./ ./tests/tests.py --path=$(TESTDIR) --test=testcleanup

+ 	./runtests --path=$(TESTDIR) -vv

  

  test: lp-test unittests tests

  

@@ -380,13 +380,14 @@ 

          elif login.protocolProfile == lasso.LOGIN_PROTOCOL_PROFILE_BRWS_POST:

              login.buildAuthnResponseMsg()

              self.debug('POSTing back to SP [%s]' % (login.msgUrl))

+             fields = [[lasso.SAML2_FIELD_RESPONSE, login.msgBody]]

+             if login.msgRelayState is not None:

+                 fields.append([lasso.SAML2_FIELD_RELAYSTATE,

+                                login.msgRelayState])

              context = {

                  "title": 'Redirecting back to the web application',

                  "action": login.msgUrl,

-                 "fields": [

-                     [lasso.SAML2_FIELD_RESPONSE, login.msgBody],

-                     [lasso.SAML2_FIELD_RELAYSTATE, login.msgRelayState],

-                 ],

+                 "fields": fields,

                  "submit": 'Return to application',

              }

              return self._template('saml2/post_response.html', **context)

file added
+3
@@ -0,0 +1,3 @@ 

+ #!/bin/sh

+ export PYTHONPATH=.

+ exec python ./tests/tests.py "$@"

file modified
+9 -26
@@ -1,14 +1,12 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING

- 

- from __future__ import print_function

+ # Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING

  

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import os

  import pwd

- import sys

  from string import Template

  

  
@@ -87,17 +85,17 @@ 

          super(IpsilonTest, self).__init__('attrs', __file__)

  

      def setup_servers(self, env=None):

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          name = 'idp1'

          addr = '127.0.0.10'

          port = '45080'

          idp = self.generate_profile(idp_g, idp_a, name, addr, port)

          conf = self.setup_idp_server(idp, name, addr, port, env)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing SP server")

+         self.setup_step("Installing SP server")

          name = 'sp1'

          addr = '127.0.0.11'

          port = '45081'
@@ -105,7 +103,7 @@ 

          conf = self.setup_sp_server(sp, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting SP's httpd server")

+         self.setup_step("Starting SP's httpd server")

          self.start_http_server(conf, env)

  

  
@@ -119,28 +117,13 @@ 

      sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon')

      sess.add_server(spname, 'https://127.0.0.11:45081')

  

-     print("attrs: Authenticate to IDP ...", end=' ')

-     try:

+     with TC.case('Authenticate to IdP'):

          sess.auth_to_idp(idpname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("attrs: Add SP Metadata to IDP ...", end=' ')

-     try:

+     with TC.case('Add SP Metadata to IdP'):

          sess.add_sp_metadata(idpname, spname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("attrs: Access SP Protected Area Variables...", end=' ')

-     try:

+     with TC.case('Access SP Protected Area Variables'):

          page = sess.fetch_page(idpname,

                                 'https://127.0.0.11:45081/sp/index.shtml')

          page.expected_value('text()', 'Test User %s' % user)

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

file modified
+21 -74
@@ -1,14 +1,12 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2016 Ipsilon project Contributors, for license see COPYING

- 

- from __future__ import print_function

+ # Copyright (C) 2016-2017 Ipsilon project Contributors, for license see COPYING

  

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import os

  import pwd

- import sys

  from string import Template

  

  idp_g = {'TEMPLATES': '${TESTDIR}/templates/install',
@@ -93,17 +91,17 @@ 

          super(IpsilonTest, self).__init__('authz', __file__)

  

      def setup_servers(self, env=None):

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          name = 'idp1'

          addr = '127.0.0.10'

          port = '45080'

          idp = self.generate_profile(idp_g, idp_a, name, addr, port)

          conf = self.setup_idp_server(idp, name, addr, port, env)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing first SP server")

+         self.setup_step("Installing first SP server")

          name = 'sp1'

          addr = '127.0.0.11'

          port = '45081'
@@ -111,10 +109,10 @@ 

          conf = self.setup_sp_server(sp, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting first SP's httpd server")

+         self.setup_step("Starting first SP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing second SP server")

+         self.setup_step("Installing second SP server")

          name = 'sp2'

          addr = '127.0.0.12'

          port = '45082'
@@ -122,7 +120,7 @@ 

          conf = self.setup_sp_server(sp, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting second SP's httpd server")

+         self.setup_step("Starting second SP's httpd server")

          self.start_http_server(conf, env)

  

  
@@ -138,107 +136,56 @@ 

      sess.add_server(sp1name, 'https://127.0.0.11:45081')

      sess.add_server(sp2name, 'https://127.0.0.12:45082')

  

-     print("authz: Authenticate to IDP ...", end=' ')

-     try:

+     with TC.case('Authenticate to IdP'):

          sess.auth_to_idp(idpname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("authz: Add SP1 Metadata to IDP ...", end=' ')

-     try:

+     with TC.case('Add SP1 Metadata to IdP'):

          sess.add_sp_metadata(idpname, sp1name)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("authz: Add SP2 Metadata to IDP ...", end=' ')

-     try:

+     with TC.case('Add SP2 Metadata to IdP'):

          sess.add_sp_metadata(idpname, sp2name)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("authz: Access SP1 when authz stack set to allow ...", end=' ')

-     try:

+     with TC.case('Access SP1 when authz stack set to allow'):

          page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/')

          page.expected_value('text()', 'WORKS!')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("authz: Set IDP authz stack to deny ...", end=' ')

-     try:

+     with TC.case('Set IdP authz stack to deny'):

          sess.disable_plugin(idpname, 'authz', 'allow')

          sess.enable_plugin(idpname, 'authz', 'deny')

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

      sess2 = HttpSessions()

      sess2.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon')

      sess2.add_server(sp1name, 'https://127.0.0.11:45081')

  

-     print("authz: Fail access SP1 when authz stack set to deny, with "

-           "pre-auth ...", end=' ')

-     try:

+     with TC.case('Fail access to SP1 when authz stack set to deny, with '

+                  'pre-auth'):

          sess2.auth_to_idp(idpname)

          page = sess2.fetch_page(idpname, 'https://127.0.0.11:45081/sp/')

          page.expected_status(401)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

      sess3 = HttpSessions()

      sess3.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon')

      sess3.add_server(sp1name, 'https://127.0.0.11:45081')

  

-     print("authz: Fail access SP1 when authz stack set to deny, without "

-           "pre-auth ...", end=' ')

-     try:

+     with TC.case('Fail access to SP1 with authz stack set to deny, without '

+                  'pre-auth'):

          page = sess3.fetch_page(idpname, 'https://127.0.0.11:45081/sp/')

          page.expected_status(401)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("authz: Set IDP authz stack to spgroup ...", end=' ')

-     try:

+     with TC.case('Set IdP authz stack to spgroup'):

          sess.disable_plugin(idpname, 'authz', 'deny')

          sess.enable_plugin(idpname, 'authz', 'spgroup')

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

      sess4 = HttpSessions()

      sess4.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon')

      sess4.add_server(sp1name, 'https://127.0.0.11:45081')

      sess4.add_server(sp2name, 'https://127.0.0.12:45082')

  

-     print("authz: Access SP1 when authz stack set to spgroup ...", end=' ')

-     try:

+     with TC.case('Access SP1 with authz stack set to spgroup'):

          sess4.auth_to_idp(idpname)

          page = sess4.fetch_page(idpname, 'https://127.0.0.11:45081/sp/')

          page.expected_value('text()', 'WORKS!')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

- 

-     print("authz: Fail to access SP2 when authz stack set to spgroup ...",

-           end=' ')

-     try:

+ 

+     with TC.case('Fail to access SP2 with authz stack set to spgroup'):

          page = sess4.fetch_page(idpname, 'https://127.0.0.12:45082/sp/')

          page.expected_status(401)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

file modified
+26 -21
@@ -1,10 +1,9 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING

- 

- from __future__ import print_function

+ # Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING

  

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import os

  import pwd
@@ -49,11 +48,10 @@ 

              db_name = 'adminconfig'

          test_db = os.path.join(db_outdir, '%s.sqlite' % db_name)

          p = subprocess.Popen(['/usr/bin/sqlite3', test_db, '.dump'],

-                              stdout=subprocess.PIPE)

+                              stdout=subprocess.PIPE, stderr=self.stderr)

          output, _ = p.communicate()

          if p.returncode:

-             print('Sqlite dump failed')

-             sys.exit(1)

+             TC.fail('Sqlite dump failed')

          return output

  

      def use_readonly_adminconfig(self, name):
@@ -75,7 +73,7 @@ 

  

      def test_upgrade_from(self, env, old_version, with_readonly):

          # Setup IDP Server

-         print("Installing IDP server to test upgrade from %i" % old_version)

+         TC.info("Installing IDP server to test upgrade from %i" % old_version)

          name = 'idp_v%i' % old_version

          if with_readonly:

              name = name + '_readonly'
@@ -105,7 +103,9 @@ 

                  if database not in ['adminconfig',

                                      'openid'] or not with_readonly:

                      cmd = ['/usr/bin/sqlite3', db_out, '.read %s' % db_in]

-                     subprocess.check_call(cmd)

+                     subprocess.check_call(cmd,

+                                           stdout=self.stdout,

+                                           stderr=self.stderr)

  

              # Upgrade that database

              cmd = [os.path.join(self.rootdir,
@@ -113,7 +113,8 @@ 

                     cfgfile]

              subprocess.check_call(cmd,

                                    cwd=os.path.join(self.testdir, 'lib', name),

-                                   env=env)

+                                   env=env,

+                                   stdout=self.stdout, stderr=self.stderr)

  

          # Check some version-specific changes, to see if the upgrade went OK

          if old_version == 0:
@@ -156,18 +157,29 @@ 

              exe.append('no-readonly')

          exe.append(name)

          exe.append('%s:%s' % (addr, port))

-         exit_code = subprocess.call(exe, env=env)

-         if exit_code:

-             sys.exit(exit_code)

+         result = self.run_and_collect(exe, env=env)

  

          # Now kill the last http server

          os.killpg(http_server.pid, signal.SIGTERM)

          self.processes.remove(http_server)

  

+         return result

+ 

      def run(self, env):

+         overall_exit_code = 0

+         overall_results = []

+ 

          for version in range(ipsilon.util.data.CURRENT_SCHEMA_VERSION):

              for with_readonly in [True, False]:

-                 self.test_upgrade_from(env, version, with_readonly)

+                 exit_code, results = self.test_upgrade_from(env,

+                                                             version,

+                                                             with_readonly)

+ 

+             if exit_code != 0:

+                 overall_exit_code = 1

+             overall_results.extend(results)

+ 

+         return overall_exit_code, overall_results

  

  

  if __name__ == '__main__':
@@ -182,12 +194,5 @@ 

      sess.add_server(idpname, 'https://%s' % url, user,

                      'ipsilon')

  

-     print("dbupgrades: From v%s %s: Authenticate to IDP ..." % (from_version,

-                                                                 with_ro),

-           end=' ')

-     try:

+     with TC.case('From v%s %s: Authenticate to IdP' % (from_version, with_ro)):

          sess.auth_to_idp(idpname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

file modified
+11 -22
@@ -1,15 +1,13 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING

- 

- from __future__ import print_function

+ # Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING

  

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import ConfigParser

  import os

  import pwd

- import sys

  from string import Template

  import subprocess

  import uuid
@@ -141,30 +139,31 @@ 

          super(IpsilonTest, self).__init__('fconf', __file__)

  

      def setup_servers(self, env=None):

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          idp = self.generate_profile(idp_g, idp_a, idpname, idpaddr, idpport)

          idpconf = self.setup_idp_server(idp, idpname, idpaddr, idpport, env)

  

-         print("Installing SP server")

+         self.setup_step("Installing SP server")

          sp = self.generate_profile(sp_g, sp_a, spname, spaddr, spport)

          spconf = self.setup_sp_server(sp, spname, spaddr, spport, env)

          fixup_sp_httpd(os.path.dirname(spconf))

  

          fixup_idp_conf(self.testdir)

  

-         print("Testing database upgrade")

+         self.setup_step("Testing database upgrade")

          cfgfile = os.path.join(self.testdir, 'etc', idpname, 'ipsilon.conf')

          cmd = [os.path.join(self.rootdir,

                              'ipsilon/install/ipsilon-upgrade-database'),

                 cfgfile]

          subprocess.check_call(cmd,

                                cwd=os.path.join(self.testdir, 'lib', idpname),

-                               env=env)

+                               env=env,

+                               stdout=self.stdout, stderr=self.stderr)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          self.start_http_server(idpconf, env)

  

-         print("Starting SP's httpd server")

+         self.setup_step("Starting SP's httpd server")

          self.start_http_server(spconf, env)

  

  
@@ -176,20 +175,10 @@ 

      sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon')

      sess.add_server(spname, 'https://127.0.0.11:45081')

  

-     print("fconf: Access IdP Homepage ... ", end=' ')

-     try:

+     with TC.case('Access IdP homepage'):

          page = sess.fetch_page(idpname, 'https://127.0.0.10:45080/idp1/')

          page.expected_value('//title/text()', 'Ipsilon')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("fconf: Access SP Protected Area ...", end=' ')

-     try:

+     with TC.case('Access SP protected area'):

          page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/')

          page.expected_value('text()', 'WORKS!')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

file modified
+70 -15
@@ -12,6 +12,8 @@ 

  from string import Template

  import subprocess

  

+ from control import TC  # pylint: disable=relative-import

+ 

  

  WRAP_HOSTNAME = 'idp.ipsilon.dev'

  TESTREALM = 'IPSILON.DEV'
@@ -81,15 +83,23 @@ 

          self.testuser = pwd.getpwuid(os.getuid())[0]

          self.processes = []

          self.allow_wrappers = allow_wrappers

+         self.current_setup_step = None

+         self.print_cases = False

+         self.stdout = None

+         self.stderr = None

  

      def platform_supported(self):

          """This return whether the current platform supports this test.

  

          This is used for example with specific modules or features that are not

          supported on all platforms due to dependency availability.

+ 

+         If the platform is supported, it returns None.

+         Otherwise it returns a string indicating why the platform does not

+         support the current test.

          """

          # Every test defaults to being available on every platform

-         return True

+         return None

  

      def force_remove(self, op, name, info):

          os.chmod(name, 0700)
@@ -116,7 +126,8 @@ 

                 '-x509', '-nodes', '-subj', '/CN=Ipsilon Test CA',

                 '-keyout', os.path.join(self.testdir, 'certs', 'root.key.pem'),

                 '-out', os.path.join(self.testdir, 'certs', 'root.cert.pem')]

-         subprocess.check_call(cmd)

+         subprocess.check_call(cmd,

+                               stdout=self.stdout, stderr=self.stderr)

          open(os.path.join(self.testdir, 'certs', 'db'), 'w').close()

  

          with open(os.path.join(self.testdir, 'certs', 'serial'), 'w') as ser:
@@ -203,7 +214,8 @@ 

                 '-out', '%s.csr' % certpath,

                 '-keyout', keypath,

                 '-subj', '/CN=Ipsilon Test %s' % name]

-         subprocess.check_call(cmd)

+         subprocess.check_call(cmd,

+                               stdout=self.stdout, stderr=self.stderr)

          cmd = ['openssl', 'ca', '-batch', '-notext', '-days', '2',

                 '-md', 'sha1',

                 '-subj', '/CN=Ipsilon Test %s' % name,
@@ -221,7 +233,8 @@ 

              # We just set it to a known value to make sure openssl doesn't

              # crash.

              ipaddr = '127.0.0.10'

-         subprocess.check_call(cmd, env={'ADDR': addr, 'IPADDR': ipaddr})

+         subprocess.check_call(cmd, env={'ADDR': addr, 'IPADDR': ipaddr},

+                               stdout=self.stdout, stderr=self.stderr)

  

      def setup_idp_server(self, profile, name, addr, port, env):

          http_conf_file = self.setup_http(name, addr, port)
@@ -233,7 +246,8 @@ 

          cmd = [os.path.join(self.rootdir,

                              'ipsilon/install/ipsilon-server-install'),

                 '--config-profile=%s' % profile]

-         subprocess.check_call(cmd, env=env)

+         subprocess.check_call(cmd, env=env,

+                               stdout=self.stdout, stderr=self.stderr)

          os.symlink(os.path.join(self.rootdir, 'ipsilon'),

                     os.path.join(self.testdir, 'lib', name, 'ipsilon'))

  
@@ -244,13 +258,15 @@ 

          cmd = [os.path.join(self.rootdir,

                              'ipsilon/install/ipsilon-client-install'),

                 '--config-profile=%s' % profile]

-         subprocess.check_call(cmd, env=env)

+         subprocess.check_call(cmd, env=env,

+                               stdout=self.stdout, stderr=self.stderr)

  

          return http_conf_file

  

      def setup_pgdb(self, datadir, env):

          cmd = ['/usr/bin/pg_ctl', 'initdb', '-D', datadir, '-o', '-E UNICODE']

-         subprocess.check_call(cmd, env=env)

+         subprocess.check_call(cmd, env=env,

+                               stdout=self.stdout, stderr=self.stderr)

          auth = 'host all all 127.0.0.1/24 trust\n'

          filename = os.path.join(datadir, 'pg_hba.conf')

          with open(filename, 'a') as f:
@@ -267,7 +283,8 @@ 

          env['ETCD_INITIAL_ADVERTISE_PEER_URLS'] = 'http://%s:%s' % (addr,

                                                                      srvport)

          p = subprocess.Popen(['/usr/bin/etcd'],

-                              env=env, preexec_fn=os.setsid)

+                              env=env, preexec_fn=os.setsid,

+                              stdout=self.stdout, stderr=self.stderr)

          self.processes.append(p)

          return p

  
@@ -277,7 +294,8 @@ 

          env['REQUESTS_CA_BUNDLE'] = os.path.join(self.testdir, 'certs',

                                                   'root.cert.pem')

          p = subprocess.Popen(['/usr/sbin/httpd', '-DFOREGROUND', '-f', conf],

-                              env=env, preexec_fn=os.setsid)

+                              env=env, preexec_fn=os.setsid,

+                              stdout=self.stdout, stderr=self.stderr)

          self.processes.append(p)

          return p

  
@@ -286,13 +304,15 @@ 

                                '-k %s -c port=%s -c \

                                 listen_addresses=%s' % (rundir, port, addr),

                                '-l', log, '-w'],

-                              env=env, preexec_fn=os.setsid)

+                              env=env, preexec_fn=os.setsid,

+                              stdout=self.stdout, stderr=self.stderr)

          self.processes.append(p)

          p.wait()

          for d in ['adminconfig', 'users', 'transactions', 'sessions',

                    'saml2.sessions.db']:

              cmd = ['/usr/bin/createdb', '-h', addr, '-p', port, d]

-             subprocess.check_call(cmd, env=env)

+             subprocess.check_call(cmd, env=env,

+                                   stdout=self.stdout, stderr=self.stderr)

  

      def setup_ldap(self, env):

          ldapdir = os.path.join(self.testdir, 'ldap')
@@ -304,14 +324,16 @@ 

          with open(filename, 'w+') as f:

              f.write(text)

          subprocess.check_call(['/usr/sbin/slapadd', '-f', filename, '-l',

-                                'tests/ldapdata.ldif'], env=env)

+                                'tests/ldapdata.ldif'], env=env,

+                               stdout=self.stdout, stderr=self.stderr)

  

          return filename

  

      def start_ldap_server(self, conf, addr, port, env):

          p = subprocess.Popen(['/usr/sbin/slapd', '-d', '0', '-f', conf,

                               '-h', 'ldap://%s:%s' % (addr, port)],

-                              env=env, preexec_fn=os.setsid)

+                              env=env, preexec_fn=os.setsid,

+                              stdout=self.stdout, stderr=self.stderr)

          self.processes.append(p)

  

      def setup_kdc(self, env):
@@ -358,7 +380,8 @@ 

              raise ValueError('KDC Setup failed')

  

          kdcproc = subprocess.Popen(['krb5kdc', '-n'],

-                                    env=kdcenv, preexec_fn=os.setsid)

+                                    env=kdcenv, preexec_fn=os.setsid,

+                                    stdout=self.stdout, stderr=self.stderr)

          self.processes.append(kdcproc)

  

          return kdcenv
@@ -419,8 +442,40 @@ 

      def setup_servers(self, env=None):

          raise NotImplementedError()

  

+     def setup_step(self, message):

+         """Method to inform setup step starting."""

+         self.current_setup_step = message

+ 

      def run(self, env):

+         """Method to run the test process and receive progress reports.

+ 

+         The test process is run in a subprocess because it needs to be run with

+         the socket and nss wrappers, which are used a LD_PRELOAD, which means

+         the environment must be set before the process starts.

+ 

+         The process running run() (Test Control process) communicates with the

+         Test Process by reading specially formatted strings from standard out.

+ 

+         All lines read from the test's stdout will be passed into TC.get_result

+         to determine whether a test result was provided.

+         """

          exe = self.execname

          if exe.endswith('c'):

              exe = exe[:-1]

-         return subprocess.call([exe], env=env)

+         return self.run_and_collect([exe], env)

+ 

+     def run_and_collect(self, cmd, env):

+         p = subprocess.Popen(cmd, env=env,

+                              stdout=subprocess.PIPE, stderr=self.stderr)

+         results = []

+         for line in p.stdout:

+             line = line[:-1]  # Strip newline

+             result = TC.get_result(line)

+             if result:

+                 if self.print_cases:

+                     TC.output(result)

+                 results.append(result)

+             else:

+                 if self.stdout is None:

+                     print(line)

+         return p.wait(), results

@@ -0,0 +1,98 @@ 

+ # Copyright (C) 2017 Ipsilon project Contributors, for license see COPYING

+ 

+ from __future__ import print_function

+ 

+ import sys

+ 

+ 

+ class TC(object):

+     """Test Control helpers methods.

+ 

+     This class is here to give methods short names, and users should not need

+     to instantiate classes.

+     """

+     prefix = '**TEST**:'

+     output_method = print

+ 

+     def __init__(self):

+         raise Exception("No need to initialize Test Control class instances")

+ 

+     @staticmethod

+     def store_results(lst):

+         """Registers an output_method that adds results into lst."""

+         @staticmethod

+         def putter(msg):

+             lst.append(msg)

+         TC.output_method = putter

+ 

+     class case(object):

+         def __init__(self, name, should_fail=False):

+             self.name = name

+             self.should_fail = should_fail

+ 

+         def __enter__(self):

+             TC.output_method(TC.prefix + 'start:' + self.name)

+ 

+         def __exit__(self, exc_class, exc, tb):

+             if exc is None and not self.should_fail:

+                 TC.output_method(TC.prefix + 'done')

+             elif not self.should_fail:

+                 TC.output_method(TC.prefix + 'fail:' + repr(exc))

+                 sys.exit(1)

+             elif not exc:

+                 TC.output_method(TC.prefix + 'fail:Should have failed')

+                 sys.exit(1)

+             else:

+                 # should_fail can either be True, in which any exception counts

+                 # as pass, or it can be a string, in which case the string

+                 # needs to occur in the str(exc) to count as a pass

+                 failed_correctly = False

+                 if self.should_fail is True:

+                     failed_correctly = True

+                 else:

+                     failed_correctly = self.should_fail in str(exc)

+ 

+                 if failed_correctly:

+                     TC.output_method(TC.prefix + 'done')

+                     return True  # Tell Python to swallow the exception

+                 else:

+                     TC.output_method(TC.prefix + 'fail:' + repr(exc))

+                     sys.exit(1)

+ 

+     @staticmethod

+     def info(msg):

+         TC.output_method(TC.prefix + 'info:' + msg)

+ 

+     @staticmethod

+     def fail(msg):

+         TC.output_method(TC.prefix + 'fail:' + msg)

+         sys.exit(1)

+ 

+     @staticmethod

+     def get_result(line):

+         """Determines whether the line is a test case result.

+ 

+         If the input line is a test case result, a tuple is returned with the

+         different result fields. If not, None is returned.

+ 

+         The output tuple depends on the type of result:

+         case start: ('start', 'casename')

+         case done:  ('done',)

+         case fail:  ('fail', 'some error')

+         """

+         if line.startswith(TC.prefix):

+             return tuple(line[len(TC.prefix):].split(':'))

+         else:

+             return None

+ 

+     @staticmethod

+     def output(result):

+         """Prints the result tuple."""

+         if result[0] == 'start':

+             print('Case %s... ' % result[1], end=' ')

+         elif result[0] == 'info':

+             print('Info: %s' % result[1])

+         elif result[0] == 'done':

+             print('SUCCESS')

+         elif result[0] == 'fail':

+             print('FAILED: %s' % result[1])

file modified
+37 -17
@@ -269,7 +269,7 @@ 

          return ['post', url, {'data': params}]

  

      def fetch_page(self, idp, target_url, follow_redirect=True, krb=False,

-                    require_consent=None, return_prefix=None):

+                    require_consent=None, return_prefix=None, post_forms=True):

          """

          Fetch a page and parse the response code to determine what to do

          next.
@@ -289,8 +289,13 @@ 

          action = 'get'

          args = {}

          seen_consent = False

+         redirects = 0

          r = None

  

+         if follow_redirect is True:

+             # Just make sure we get to an end at some point

+             follow_redirect = 50

+ 

          while True:

              if return_prefix and url.startswith(return_prefix):

                  if r:
@@ -299,8 +304,9 @@ 

                      return None

              r = self.access(action, url, krb=krb, **args)

              if r.status_code == 303 or r.status_code == 302:

-                 if not follow_redirect:

+                 if follow_redirect is False or redirects >= follow_redirect:

                      return PageTree(r)

+                 redirects += 1

                  url = r.headers['location']

                  action = 'get'

                  args = {}
@@ -311,41 +317,48 @@ 

  

                  # Fall back, hopefully to testauth authentication.

                  try:

-                     (action, url, args) = self.handle_login_form(idp, page)

-                     continue

+                     if post_forms:

+                         (action, url, args) = self.handle_login_form(idp, page)

+                         continue

                  except WrongPage:

                      pass

              elif r.status_code == 200:

                  page = PageTree(r)

  

                  try:

-                     (action, url, args) = self.handle_login_form(idp, page)

-                     continue

+                     if post_forms:

+                         (action, url, args) = self.handle_login_form(idp, page)

+                         continue

                  except WrongPage:

                      pass

  

                  try:

-                     (action, url, args) = self.handle_return_form(page)

-                     continue

+                     if post_forms:

+                         (action, url, args) = self.handle_return_form(page)

+                         continue

                  except WrongPage:

                      pass

  

                  try:

-                     (action, url, args) = self.handle_openid_consent_form(page)

-                     seen_consent = True

-                     continue

+                     if post_forms:

+                         (action, url, args) = self.handle_openid_consent_form(

+                             page)

+                         seen_consent = True

+                         continue

                  except WrongPage:

                      pass

  

                  try:

-                     (action, url, args) = self.handle_openid_form(page)

-                     continue

+                     if post_forms:

+                         (action, url, args) = self.handle_openid_form(page)

+                         continue

                  except WrongPage:

                      pass

  

                  try:

-                     (action, url, args) = self.handle_openidc_form(page)

-                     continue

+                     if post_forms:

+                         (action, url, args) = self.handle_openidc_form(page)

+                         continue

                  except WrongPage:

                      pass

  
@@ -454,7 +467,7 @@ 

              raise ValueError('Failed to post SP data [%s]' % repr(r))

  

      # pylint: disable=dangerous-default-value

-     def set_attributes_and_mapping(self, idp, mapping=[], attrs=[],

+     def set_attributes_and_mapping(self, idp, mapping=None, attrs=None,

                                     spname=None):

          """

          Set allowed attributes and mapping in the IDP or the SP. In the
@@ -469,6 +482,10 @@ 

          attrs is the list of attributes that will be allowed:

             ['fullname', 'givenname', 'surname']

          """

+         if mapping is None:

+             mapping = []

+         if attrs is None:

+             attrs = []

          idpsrv = self.servers[idp]

          idpuri = idpsrv['baseuri']

          if spname:  # per-SP setting
@@ -560,7 +577,7 @@ 

          if r.status_code != 200:

              raise ValueError('Failed to disable plugin [%s]' % repr(r))

  

-     def set_plugin_order(self, idp, plugtype, order=[]):

+     def set_plugin_order(self, idp, plugtype, order=None):

          """

          Set the order of the specified login stack plugin type.

  
@@ -568,6 +585,9 @@ 

  

          order must be a list of zero or more plugin names in order

          """

+         if order is None:

+             order = []

+ 

          idpsrv = self.servers[idp]

          idpuri = idpsrv['baseuri']

  

file modified
+11 -28
@@ -1,13 +1,11 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING

- 

- from __future__ import print_function

+ # Copyright (C) 2015-2017 Ipsilon project Contributors, for license see COPYING

  

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import os

- import sys

  from string import Template

  

  
@@ -92,25 +90,25 @@ 

  

      def setup_servers(self, env=None):

  

-         print("Installing IDP's ldap server")

+         self.setup_step("Installing IDP's ldap server")

          addr = '127.0.0.10'

          port = '45389'

          conf = self.setup_ldap(env)

  

-         print("Starting IDP's ldap server")

+         self.setup_step("Starting IDP's ldap server")

          self.start_ldap_server(conf, addr, port, env)

  

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          name = 'idp1'

          addr = '127.0.0.10'

          port = '45080'

          idp = self.generate_profile(idp_g, idp_a, name, addr, port)

          conf = self.setup_idp_server(idp, name, addr, port, env)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing SP server")

+         self.setup_step("Installing SP server")

          name = 'sp1'

          addr = '127.0.0.11'

          port = '45081'
@@ -118,7 +116,7 @@ 

          conf = self.setup_sp_server(sp, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting SP's httpd server")

+         self.setup_step("Starting SP's httpd server")

          self.start_http_server(conf, env)

  

  
@@ -132,28 +130,13 @@ 

      sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'tuser')

      sess.add_server(spname, 'https://127.0.0.11:45081')

  

-     print("ldap: Authenticate to IDP ...", end=' ')

-     try:

+     with TC.case('Authenticate to IdP'):

          sess.auth_to_idp(idpname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("ldap: Add SP Metadata to IDP ...", end=' ')

-     try:

+     with TC.case('Add SP Metadata to IdP'):

          sess.add_sp_metadata(idpname, spname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("ldap: Access SP Protected Area ...", end=' ')

-     try:

+     with TC.case('Access SP Protected Area'):

          page = sess.fetch_page(idpname,

                                 'https://127.0.0.11:45081/sp/index.shtml')

          page.expected_value('text()', 'Test Group;Test Group 2')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

file modified
+9 -16
@@ -1,15 +1,13 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING

+ # Copyright (C) 2015-2017 Ipsilon project Contributors, for license see COPYING

  

  # Test that we get a reasonable error back when the LDAP backend is down

  

- from __future__ import print_function

- 

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import os

- import sys

  from string import Template

  

  
@@ -94,24 +92,24 @@ 

  

      def setup_servers(self, env=None):

  

-         print("Installing IDP's ldap server")

+         self.setup_step("Installing IDP's ldap server")

          addr = '127.0.0.10'

          port = '45389'

          conf = self.setup_ldap(env)

  

-         print("Not starting IDP's ldap server")

+         self.setup_step("Not starting IDP's ldap server")

  

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          name = 'idp1'

          addr = '127.0.0.10'

          port = '45080'

          idp = self.generate_profile(idp_g, idp_a, name, addr, port)

          conf = self.setup_idp_server(idp, name, addr, port, env)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing SP server")

+         self.setup_step("Installing SP server")

          name = 'sp1'

          addr = '127.0.0.11'

          port = '45081'
@@ -119,7 +117,7 @@ 

          conf = self.setup_sp_server(sp, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting SP's httpd server")

+         self.setup_step("Starting SP's httpd server")

          self.start_http_server(conf, env)

  

  
@@ -133,14 +131,9 @@ 

      sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'tuser')

      sess.add_server(spname, 'https://127.0.0.11:45081')

  

-     print("ldapdown: Authenticate to IDP with no LDAP backend...", end=' ')

-     try:

+     with TC.case('Authenticate to Idp with no LDAP backend'):

          sess.auth_to_idp(

              idpname,

              rule='//div[@class="alert alert-danger"]/p/text()',

              expected="Internal system error"

          )

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

file modified
+15 -64
@@ -1,14 +1,12 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING

- 

- from __future__ import print_function

+ # Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING

  

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import os

  import pwd

- import sys

  import inspect

  from string import Template

  
@@ -62,17 +60,17 @@ 

          super(IpsilonTest, self).__init__('openid', __file__)

  

      def setup_servers(self, env=None):

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          name = 'idp1'

          addr = '127.0.0.10'

          port = '45080'

          idp = self.generate_profile(idp_g, idp_a, name, addr, port)

          conf = self.setup_idp_server(idp, name, addr, port, env)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing first SP server")

+         self.setup_step("Installing first SP server")

          name = 'sp1'

          addr = '127.0.0.11'

          port = '45081'
@@ -81,7 +79,7 @@ 

              inspect.currentframe())))

          fixup_sp_httpd(os.path.dirname(conf), testdir)

  

-         print("Starting SP's httpd server")

+         self.setup_step("Starting SP's httpd server")

          self.start_http_server(conf, env)

  

  
@@ -95,103 +93,56 @@ 

      sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon')

      sess.add_server(sp1name, 'https://127.0.0.11:45081')

  

-     print("openid: Authenticate to IDP ...", end=' ')

-     try:

+     with TC.case('Authenticate to IdP'):

          sess.auth_to_idp(idpname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("openid: Run OpenID Protocol ...", end=' ')

-     try:

+     with TC.case('Run OpenID Protocol'):

          page = sess.fetch_page(idpname,

                                 'https://127.0.0.11:45081/?extensions=NO',

                                 require_consent=True)

          page.expected_value('text()', 'SUCCESS, WITHOUT EXTENSIONS')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("openid: Run OpenID Protocol without consent ...", end=' ')

-     try:

+     with TC.case('Run OpenID Protocol without consent'):

          page = sess.fetch_page(idpname,

                                 'https://127.0.0.11:45081/?extensions=NO',

                                 require_consent=False)

          page.expected_value('text()', 'SUCCESS, WITHOUT EXTENSIONS')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("openid: Revoking SP consent ...", end=' ')

-     try:

+     with TC.case('Revoking SP consent'):

          page = sess.revoke_all_consent(idpname)

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("openid: Run OpenID Protocol without consent ...", end=' ')

-     try:

+     with TC.case('Run OpenID Protocol without consent'):

          page = sess.fetch_page(idpname,

                                 'https://127.0.0.11:45081/?extensions=NO',

                                 require_consent=True)

          page.expected_value('text()', 'SUCCESS, WITHOUT EXTENSIONS')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("openid: Run OpenID Protocol with extensions ...", end=' ')

-     try:

+     with TC.case('Run OpenID PRotocol with extensions'):

          # We expect consent again because we added more attributes

          page = sess.fetch_page(idpname,

                                 'https://127.0.0.11:45081/?extensions=YES',

                                 require_consent=True)

          page.expected_value('text()', 'SUCCESS, WITH EXTENSIONS')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("openid: Set IDP authz stack to deny ...", end=' ')

-     try:

+     with TC.case('Set IdP authz stack to deny'):

          sess.disable_plugin(idpname, 'authz', 'allow')

          sess.enable_plugin(idpname, 'authz', 'deny')

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

      sess2 = HttpSessions()

      sess2.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon')

      sess2.add_server(sp1name, 'https://127.0.0.11:45081')

  

-     print("openid: Run OpenID Protocol with IDP deny, with pre-auth ...",

-           end=' ')

-     try:

+     with TC.case('Run OpenID Protocol with IdP deny, with pre-auth'):

          sess2.auth_to_idp(idpname)

          page = sess2.fetch_page(idpname,

                                  'https://127.0.0.11:45081/?extensions=NO')

          page.expected_value('text()', 'ERROR: Cancelled')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

      sess3 = HttpSessions()

      sess3.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon')

      sess3.add_server(sp1name, 'https://127.0.0.11:45081')

  

-     print("openid: Run OpenID Protocol with IDP deny, without pre-auth ...",

-           end=' ')

-     try:

+     with TC.case('Run OpenID Protocol with IdP deny, without pre-auth'):

          page = sess3.fetch_page(idpname,

                                  'https://127.0.0.11:45081/?extensions=NO')

          page.expected_value('text()', 'ERROR: Cancelled')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

file modified
+30 -114
@@ -1,15 +1,13 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2016 Ipsilon project Contributors, for license see COPYING

- 

- from __future__ import print_function

+ # Copyright (C) 2016-2017 Ipsilon project Contributors, for license see COPYING

  

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import os

  import json

  import pwd

- import sys

  import requests

  import hashlib

  from string import Template
@@ -159,17 +157,17 @@ 

          super(IpsilonTest, self).__init__('openidc', __file__)

  

      def setup_servers(self, env=None):

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          name = 'idp1'

          addr = '127.0.0.10'

          port = '45080'

          idp = self.generate_profile(idp_g, idp_a, name, addr, port)

          conf = self.setup_idp_server(idp, name, addr, port, env)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing first SP server")

+         self.setup_step("Installing first SP server")

          name = 'sp1'

          addr = '127.0.0.11'

          port = '45081'
@@ -177,10 +175,10 @@ 

          conf = self.setup_sp_server(sp, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting first SP's httpd server")

+         self.setup_step("Starting first SP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing second SP server")

+         self.setup_step("Installing second SP server")

          name = 'sp2'

          addr = '127.0.0.12'

          port = '45082'
@@ -188,10 +186,10 @@ 

          conf = self.setup_sp_server(sp, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting second SP's httpd server")

+         self.setup_step("Starting second SP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing third SP server")

+         self.setup_step("Installing third SP server")

          name = 'sp3'

          addr = '127.0.0.13'

          port = '45083'
@@ -199,7 +197,7 @@ 

          conf = self.setup_sp_server(sp, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting third SP's httpd server")

+         self.setup_step("Starting third SP's httpd server")

          self.start_http_server(conf, env)

  

  
@@ -217,16 +215,10 @@ 

      sess.add_server(sp2name, 'https://127.0.0.12:45082')

      sess.add_server(sp3name, 'https://127.0.0.13:45083')

  

-     print("openidc: Authenticate to IDP ...", end=' ')

-     try:

+     with TC.case('Authenticate to IdP'):

          sess.auth_to_idp(idpname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("openidc: Registering test client ...", end=' ')

-     try:

+     with TC.case('Registering test client'):

          client_info = {

              'redirect_uris': ['https://invalid/'],

              'response_types': ['code'],
@@ -240,13 +232,8 @@ 

                            json=client_info)

          r.raise_for_status()

          reg_resp = r.json()

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("openidc: Registering test client with none auth ...", end=' ')

-     try:

+     with TC.case('Registering test client with none auth'):

          client_info = {

              'redirect_uris': ['https://invalid/'],

              'response_types': ['code'],
@@ -260,13 +247,8 @@ 

                            json=client_info)

          r.raise_for_status()

          reg_resp_none = r.json()

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("openidc: Access first SP Protected Area ...", end=' ')

-     try:

+     with TC.case('Access first SP protected area'):

          page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/',

                                 require_consent=True)

          h = hashlib.sha256()
@@ -280,34 +262,17 @@ 

              'acr': '0'

          }

          old_token = check_info_results(page.text, expect)

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

- 

-     print("openidc: Log back in to first SP Protected Area without consent"

-           " ...", end=' ')

-     try:

+ 

+     with TC.case('Log back in to first SP Protected Area without consent'):

          page = sess.fetch_page(idpname,

                                 'https://127.0.0.11:45081/sp/redirect_uri?log'

                                 'out=https%3A%2F%2F127.0.0.11%3A45081%2Fsp%2F',

                                 require_consent=False)

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("openidc: Revoking SP consent ...", end=' ')

-     try:

+     with TC.case('Revoking SP consent'):

          page = sess.revoke_all_consent(idpname)

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

- 

-     print("openidc: Log back in to first SP Protected Area with consent ...",

-           end=' ')

-     try:

+ 

+     with TC.case('Log back in to the first SP Protected Area with consent'):

          page = sess.fetch_page(idpname,

                                 'https://127.0.0.11:45081/sp/redirect_uri?log'

                                 'out=https%3A%2F%2F127.0.0.11%3A45081%2Fsp%2F',
@@ -323,24 +288,14 @@ 

              'acr': '0'

          }

          new_token = check_info_results(page.text, expect)

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("openidc: Update first SP client name ...", end=' ')

-     try:

+     with TC.case('Update first SP client name'):

          sess.update_options(

              idpname,

              'providers/openidc/admin/client/%s' % reg_resp['client_id'],

              {'Client Name': 'Test suite client updated'})

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("openidc: Retrieving token info ...", end=' ')

-     try:

+     with TC.case('Retrieving toke info'):

          # Testing token without client auth

          r = requests.post('https://127.0.0.10:45080/idp1/openidc/TokenInfo',

                            data={'token': new_token['access_token']})
@@ -416,13 +371,8 @@ 

                                  'client_secret': reg_resp['client_secret']})

          if r.status_code != 400:

              raise Exception('Deleted client accepted')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("openidc: Using none-authenticated client ...", end=' ')

-     try:

+     with TC.case('Using none-authenticated client'):

          # Test that none-authed clients don't have access to token info

          r = requests.post(

              'https://127.0.0.10:45080/idp1/openidc/TokenInfo',
@@ -478,13 +428,8 @@ 

                    'client_secret': reg_resp_none['client_secret']})

          if r.status_code != 200:

              raise Exception('Authed client not accepted')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("openidc: Checking user info ...", end=' ')

-     try:

+     with TC.case('Checking user info'):

          # Testing user info without token

          r = requests.post('https://127.0.0.10:45080/idp1/openidc/UserInfo')

          if r.status_code != 403:
@@ -503,13 +448,8 @@ 

          h.update('testcase')

          if info['sub'] != h.hexdigest():

              raise Exception('Sub claim invalid')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("openidc: Access second SP Protected Area ...", end=' ')

-     try:

+     with TC.case('Access second SP Protected Area'):

          page = sess.fetch_page(idpname, 'https://127.0.0.12:45082/sp/')

          expect = {

              'sub': user,
@@ -518,13 +458,8 @@ 

              'acr': '0'

          }

          check_info_results(page.text, expect)

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("openidc: Access third SP Protected Area ...", end=' ')

-     try:

+     with TC.case('Access third SP Protected Area'):

          page = sess.fetch_page(idpname, 'https://127.0.0.13:45083/sp/')

          h = hashlib.sha256()

          h.update('127.0.0.13')
@@ -537,47 +472,28 @@ 

              'acr': '0'

          }

          check_info_results(page.text, expect)

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("openidc: Set IDP authz stack to deny", end=' ')

-     try:

+     with TC.case('Set IdP authz stack to deny'):

          sess.disable_plugin(idpname, 'authz', 'allow')

          sess.enable_plugin(idpname, 'authz', 'deny')

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

      sess2 = HttpSessions()

      sess2.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon')

      sess2.add_server(sp1name, 'https://127.0.0.11:45081')

  

-     print("openidc: Access first SP Protected Area with IDP deny, with "

-           "pre-auth ...", end=' ')

-     try:

+     with TC.case('Access first SP Protected Area with IdP deny, with '

+                  'pre-auth'):

          sess2.auth_to_idp(idpname)

          page = sess2.fetch_page(idpname, 'https://127.0.0.11:45081/sp/')

          check_text_results(page.text,

                             'OpenID Connect Provider error: access_denied')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

      sess3 = HttpSessions()

      sess3.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon')

      sess3.add_server(sp1name, 'https://127.0.0.11:45081')

  

-     print("openidc: Access first SP Protected Area with IDP deny, without "

-           "pre-auth ...", end=' ')

-     try:

+     with TC.case('Access first SP Protected Area with IdP deny, without '

+                  'pre-auth'):

          page = sess3.fetch_page(idpname, 'https://127.0.0.11:45081/sp/')

          check_text_results(page.text,

                             'OpenID Connect Provider error: access_denied')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

file modified
+13 -37
@@ -1,14 +1,12 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING

- 

- from __future__ import print_function

+ # Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING

  

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import os

  import pwd

- import sys

  from string import Template

  

  
@@ -94,7 +92,7 @@ 

  

      def setup_servers(self, env=None):

  

-         print("Installing IDP's database server")

+         self.setup_step("Installing IDP's database server")

          datadir = os.path.join(self.testdir, 'pgdata')

          rundir = self.testdir

          log = os.path.join(self.testdir, 'log/pgdb.log')
@@ -102,20 +100,20 @@ 

          port = '45432'

          self.setup_pgdb(datadir, env)

  

-         print("Starting IDP's database server")

+         self.setup_step("Starting IDP's database server")

          self.start_pgdb_server(datadir, rundir, log, addr, port, env)

  

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          name = 'idp1'

          addr = '127.0.0.10'

          port = '45080'

          idp = self.generate_profile(idp_g, idp_a, name, addr, port)

          conf = self.setup_idp_server(idp, name, addr, port, env)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing SP server")

+         self.setup_step("Installing SP server")

          name = 'sp1'

          addr = '127.0.0.11'

          port = '45081'
@@ -123,7 +121,7 @@ 

          conf = self.setup_sp_server(sp, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting SP's httpd server")

+         self.setup_step("Starting SP's httpd server")

          self.start_http_server(conf, env)

  

  
@@ -137,44 +135,22 @@ 

      sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon')

      sess.add_server(spname, 'https://127.0.0.11:45081')

  

-     print("pgdb: Authenticate to IDP ...", end=' ')

-     sys.stdout.flush()

-     try:

-         print('Stress-testing the database connections...', end=' ')

-         sys.stdout.flush()

+     with TC.case('Authenticate to IdP'):

+         # Stress-test database connections

          for i in xrange(50):

              sess.auth_to_idp(idpname)

              sess.logout_from_idp(idpname)

          sess.auth_to_idp(idpname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("pgdb: Add SP Metadata to IDP ...", end=' ')

-     try:

+     with TC.case('Add SP Metadata to IdP'):

          sess.add_sp_metadata(idpname, spname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("pgdb: Access SP Protected Area ...", end=' ')

-     try:

+     with TC.case('Access SP Protected Area'):

          page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/')

          page.expected_value('text()', 'WORKS!')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("pgdb: Logout from SP ...", end=' ')

-     try:

+     with TC.case('Logout from SP'):

          page = sess.fetch_page(idpname, '%s/%s?%s' % (

              'https://127.0.0.11:45081', 'saml2/logout',

              'ReturnTo=https://127.0.0.11:45081/open/logged_out.html'))

          page.expected_value('text()', 'Logged out')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

file modified
+31 -53
@@ -1,14 +1,12 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING

- 

- from __future__ import print_function

+ # Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING

  

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import os

  import pwd

- import sys

  from string import Template

  

  idp_g = {'TEMPLATES': '${TESTDIR}/templates/install',
@@ -105,17 +103,17 @@ 

          super(IpsilonTest, self).__init__('test1', __file__)

  

      def setup_servers(self, env=None):

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          name = 'idp1'

          addr = '127.0.0.10'

          port = '45080'

          idp = self.generate_profile(idp_g, idp_a, name, addr, port)

          conf = self.setup_idp_server(idp, name, addr, port, env)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing first SP server")

+         self.setup_step("Installing first SP server")

          name = 'sp1'

          addr = '127.0.0.11'

          port = '45081'
@@ -123,10 +121,10 @@ 

          conf = self.setup_sp_server(sp, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting first SP's httpd server")

+         self.setup_step("Starting first SP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing second SP server")

+         self.setup_step("Installing second SP server")

          name = 'sp2-test.example.com'

          addr = '127.0.0.11'

          port = '45082'
@@ -137,7 +135,7 @@ 

          os.remove(os.path.dirname(sp) + '/pw.txt')

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting second SP's httpd server")

+         self.setup_step("Starting second SP's httpd server")

          self.start_http_server(conf, env)

  

  
@@ -153,72 +151,52 @@ 

      sess.add_server(sp1name, 'https://127.0.0.11:45081')

      sess.add_server(sp2name, 'https://127.0.0.11:45082')

  

-     print("test1: Authenticate to IDP ...", end=' ')

-     try:

+     with TC.case('Authenticate to IDP'):

          sess.auth_to_idp(idpname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("test1: Add first SP Metadata to IDP ...", end=' ')

-     try:

+     with TC.case('Add first SP metadata to IDP'):

          sess.add_sp_metadata(idpname, sp1name)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("test1: Access first SP Protected Area ...", end=' ')

-     try:

+     with TC.case('Make sure we send no RelayState if none was requested'):

+         page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/',

+                                follow_redirect=1)

+         # Cut off the RelayState

+         target = page.result.headers['Location']

+         target = target[:target.find('&RelayState=')]

+         page = sess.fetch_page(idpname, target, post_forms=False)

+         data = sess.get_form_data(page, 'saml-response', ['name', 'value'])

+         if data[0] != 'https://127.0.0.11:45081/saml2/postResponse':

+             TC.fail('Incorrect form found')

+         if 'RelayState' in data[2]:

+             TC.fail('RelayState found in response form')

+         if len(data[3]) > 1 and data[3][1] == 'None':

+             TC.fail('RelayState of None sent')

+         # Note that at this point, we have not actually submitted the response

+ 

+     with TC.case('Access first AP protected area'):

          page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/')

          page.expected_value('text()', 'WORKS!')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("test1: Access second SP Protected Area ...", end=' ')

-     try:

+     with TC.case('Access second SP protected area'):

          page = sess.fetch_page(idpname, 'https://127.0.0.11:45082/sp/')

          page.expected_value('text()', 'WORKS!')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("test1: Update second SP ...", end=' ')

-     try:

+     with TC.case('Update second SP'):

          # This is a test to see whether we can update SAML SPs where the name

          # is an FQDN (includes hyphens and dots). See bug #196

          sess.set_attributes_and_mapping(idpname, [],

                                          ['namefull', 'givenname', 'surname'],

                                          spname=sp2name)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

  

-     print("test1: Try authentication failure ...", end=' ')

      newsess = HttpSessions()

      newsess.add_server(idpname, 'https://127.0.0.10:45080', user, 'wrong')

-     try:

+     with TC.case('Try authentication failure', should_fail=True):

          newsess.auth_to_idp(idpname)

-         print(" ERROR: Authentication should have failed", file=sys.stderr)

-         sys.exit(1)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" SUCCESS")

  

-     print("test1: Add keyless SP Metadata to IDP ...", end=' ')

-     try:

+     with TC.case('Add keyless SP metadata to IDP'):

          sess.add_metadata(idpname, 'keyless', keyless_metadata)

          page = sess.fetch_page(idpname, 'https://127.0.0.10:45080/idp1/admin/'

                                          'providers/saml2/admin')

          page.expected_value('//div[@id="row_provider_http://keyless-sp"]/'

                              '@title',

                              'WARNING: SP does not have signing keys!')

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

file modified
+16 -61
@@ -1,14 +1,12 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING

- 

- from __future__ import print_function

+ # Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING

  

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import os

  import pwd

- import sys

  import sqlite3

  from string import Template

  import time
@@ -81,17 +79,17 @@ 

          super(IpsilonTest, self).__init__('testcleanup', __file__)

  

      def setup_servers(self, env=None):

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          name = 'idp1'

          addr = '127.0.0.10'

          port = '45080'

          idp = self.generate_profile(idp_g, idp_a, name, addr, port)

          conf = self.setup_idp_server(idp, name, addr, port, env)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing SP server")

+         self.setup_step("Installing SP server")

          name = 'sp1'

          addr = '127.0.0.11'

          port = '45081'
@@ -99,7 +97,7 @@ 

          conf = self.setup_sp_server(sp, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting first SP's httpd server")

+         self.setup_step("Starting first SP's httpd server")

          self.start_http_server(conf, env)

  

  
@@ -113,52 +111,25 @@ 

      sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon')

      sess.add_server(sp1name, 'https://127.0.0.11:45081')

  

-     print("testcleanup: Verify logged out state ...", end=' ')

-     try:

+     with TC.case('Verify logged out state'):

          page = sess.fetch_page(idpname, 'https://127.0.0.10:45080/idp1/')

          page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testcleanup: Authenticate to IDP ...", end=' ')

-     try:

+     with TC.case('Authenticating to IdP'):

          sess.auth_to_idp(idpname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testcleanup: Add SP Metadata to IDP ...", end=' ')

-     try:

+     with TC.case('Add SP Metadata to IdP'):

          sess.add_sp_metadata(idpname, sp1name)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testcleanup: Access first SP Protected Area ...", end=' ')

-     try:

+     with TC.case('Access first SP Protected Area'):

          page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/')

          page.expected_value('text()', 'WORKS!')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testcleanup: Verify logged in state ...", end=' ')

-     try:

+     with TC.case('Verify logged in state'):

          page = sess.fetch_page(idpname, 'https://127.0.0.10:45080/idp1/')

          page.expected_value('//div[@id="content"]/p/a/text()', None)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

- 

-     print("testcleanup: Checking that SAML2 sessions were created ...",

-           end=' ')

-     try:

+ 

+     with TC.case('Checking that SAML2 sessions were created'):

          sess_db = os.path.join(os.environ['TESTDIR'],

                                 'lib/idp1/saml2.sessions.db.sqlite')

          conn = sqlite3.connect(sess_db)
@@ -167,29 +138,17 @@ 

          if len(cur.fetchall()) == 0:

              raise ValueError('SAML2 sessions not created')

          conn.close()

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

      # Sessions are valid for six seconds, and we clean up once per minute.

      # However, checking after a minute is kinda cutting it close, so we add ten

      # seconds to make sure the system has had time to clean up.

-     print("Waiting a minute for cleanup to happen ...")

      time.sleep(70)

  

-     print("testcleanup: Verify logged out state ...", end=' ')

-     try:

+     with TC.case('Verify logged out state'):

          page = sess.fetch_page(idpname, 'https://127.0.0.10:45080/idp1/')

          page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

- 

-     print("testcleanup: Checking that SAML2 sessions were destroyed ...",

-           end=' ')

-     try:

+ 

+     with TC.case('Checking that SAML2 sessions were destroyed'):

          sess_db = os.path.join(os.environ['TESTDIR'],

                                 'lib/idp1/saml2.sessions.db.sqlite')

          conn = sqlite3.connect(sess_db)
@@ -197,7 +156,3 @@ 

          cur.execute('SELECT * FROM saml2_sessions;')

          if len(cur.fetchall()) != 0:

              raise ValueError('SAML2 sessions left behind: %s' % cur.fetchall())

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

file modified
+27 -64
@@ -2,13 +2,11 @@ 

  #

  # Copyright (C) 2017 Ipsilon project Contributors, for license see COPYING

  

- from __future__ import print_function

- 

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import os

  import pwd

- import sys

  import subprocess

  from string import Template

  
@@ -118,25 +116,21 @@ 

                                   stdout=subprocess.PIPE)

              stdout, _ = p.communicate()

          except OSError:

-             print("No etcd installed")

-             return False

+             return 'No etcd installed'

          if not p.wait() == 0:

-             print('No etcd installed')

-             return False

+             return 'No etcd installed'

          # Example line: etcd Version: 3.0.13

          if int(stdout.split('\n')[0].split(': ')[1][0]) < 3:

-             print('Etcd version < 3.0')

-             return False

+             return 'Etcd version < 3.0'

          try:

              import etcd  # pylint: disable=unused-variable,import-error

          except ImportError:

-             print('No python-etcd available')

-             return False

-         return True

+             return 'No python-etcd available'

+         return None

  

      def setup_servers(self, env=None):

  

-         print("Starting IDP's etcd server")

+         self.setup_step("Starting IDP's etcd server")

          datadir = os.path.join(self.testdir, 'etcd')

          os.mkdir(datadir)

          addr = '127.0.0.10'
@@ -144,17 +138,17 @@ 

          srvport = '42380'

          self.start_etcd_server(datadir, addr, clientport, srvport, env)

  

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          name = 'idp1'

          addr = '127.0.0.10'

          port = '45080'

          idp = self.generate_profile(idp_g, idp_a, name, addr, port)

          conf = self.setup_idp_server(idp, name, addr, port, env)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing first SP server")

+         self.setup_step("Installing first SP server")

          name = 'sp1'

          addr = '127.0.0.11'

          port = '45081'
@@ -162,10 +156,10 @@ 

          conf = self.setup_sp_server(sp, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting first SP's httpd server")

+         self.setup_step("Starting first SP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing second SP server")

+         self.setup_step("Installing second SP server")

          name = 'sp2-test.example.com'

          addr = '127.0.0.11'

          port = '45082'
@@ -176,7 +170,7 @@ 

          os.remove(os.path.dirname(sp) + '/pw.txt')

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting second SP's httpd server")

+         self.setup_step("Starting second SP's httpd server")

          self.start_http_server(conf, env)

  

  
@@ -192,72 +186,41 @@ 

      sess.add_server(sp1name, 'https://127.0.0.11:45081')

      sess.add_server(sp2name, 'https://127.0.0.11:45082')

  

-     print("etcd: Authenticate to IDP ...", end=' ')

-     try:

+     with TC.case('Authenticate to IdP'):

          sess.auth_to_idp(idpname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("etcd: Add first SP Metadata to IDP ...", end=' ')

-     try:

+     with TC.case('Add first SP Metadata to IdP'):

          sess.add_sp_metadata(idpname, sp1name)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("etcd: Access first SP Protected Area ...", end=' ')

-     try:

+     with TC.case('Access first SP Protected Area'):

          page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/')

          page.expected_value('text()', 'WORKS!')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("etcd: Access second SP Protected Area ...", end=' ')

-     try:

+     with TC.case('Access second SP Protected Area'):

          page = sess.fetch_page(idpname, 'https://127.0.0.11:45082/sp/')

          page.expected_value('text()', 'WORKS!')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("etcd: Update second SP ...", end=' ')

-     try:

+     with TC.case('Update Second SP'):

          # This is a test to see whether we can update SAML SPs where the name

          # is an FQDN (includes hyphens and dots). See bug #196

          sess.set_attributes_and_mapping(idpname, [],

                                          ['namefull', 'givenname', 'surname'],

                                          spname=sp2name)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

  

-     print("etcd: Try authentication failure ...", end=' ')

      newsess = HttpSessions()

      newsess.add_server(idpname, 'https://127.0.0.10:45080', user, 'wrong')

-     try:

-         newsess.auth_to_idp(idpname)

-         print(" ERROR: Authentication should have failed", file=sys.stderr)

-         sys.exit(1)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" SUCCESS")

- 

-     print("etcd: Add keyless SP Metadata to IDP ...", end=' ')

-     try:

+     with TC.case('Try authentication failure'):

+         try:

+             newsess.auth_to_idp(idpname)

+         except Exception as e:  # pylint: disable=broad-except

+             pass

+         else:

+             raise ValueError('Authentication should have failed')

+ 

+     with TC.case('Add keyless SP Metadata to IdP'):

          sess.add_metadata(idpname, 'keyless', keyless_metadata)

          page = sess.fetch_page(idpname, 'https://127.0.0.10:45080/idp1/admin/'

                                          'providers/saml2/admin')

          page.expected_value('//div[@id="row_provider_http://keyless-sp"]/'

                              '@title',

                              'WARNING: SP does not have signing keys!')

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

file modified
+15 -37
@@ -1,15 +1,13 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING

- 

- from __future__ import print_function

+ # Copyright (C) 2015-2017 Ipsilon project Contributors, for license see COPYING

  

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

  from helpers.common import WRAP_HOSTNAME  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import os

  import pwd

- import sys

  from string import Template

  

  idp_g = {'TEMPLATES': '${TESTDIR}/templates/install',
@@ -96,16 +94,16 @@ 

      def setup_servers(self, env=None):

          os.mkdir("%s/ccaches" % self.testdir)

  

-         print("Installing KDC server")

+         self.setup_step("Installing KDC server")

          kdcenv = self.setup_kdc(env)

  

-         print("Creating principals and keytabs")

+         self.setup_step("Creating principals and keytabs")

          self.setup_keys(kdcenv)

  

-         print("Getting a TGT")

+         self.setup_step("Getting a TGT")

          self.kinit_keytab(kdcenv)

  

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          name = 'idp1'

          addr = 'idp.ipsilon.dev'

          port = '45080'
@@ -113,10 +111,10 @@ 

          idp = self.generate_profile(idp_g, idp_a, name, addr, port)

          conf = self.setup_idp_server(idp, name, addr, port, env)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing first SP server")

+         self.setup_step("Installing first SP server")

          name = 'sp1'

          addr = '127.0.0.11'

          port = '45081'
@@ -124,10 +122,10 @@ 

          conf = self.setup_sp_server(sp, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting first SP's httpd server")

+         self.setup_step("Starting first SP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing second SP server")

+         self.setup_step("Installing second SP server")

          name = 'sp2'

          addr = '127.0.0.11'

          port = '45082'
@@ -138,7 +136,7 @@ 

          os.remove(os.path.dirname(sp) + '/pw.txt')

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting second SP's httpd server")

+         self.setup_step("Starting second SP's httpd server")

          self.start_http_server(conf, env)

  

  
@@ -165,36 +163,16 @@ 

      sess.add_server(sp1name, 'https://127.0.0.11:45081')

      sess.add_server(sp2name, 'https://127.0.0.11:45082')

  

-     print("testgssapi: Authenticate to IDP ...", end=' ')

-     try:

+     with TC.case('Authenticate to IdP'):

          sess.auth_to_idp(idpname, krb=True)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testgssapi: Add first SP Metadata to IDP ...", end=' ')

-     try:

+     with TC.case('Add first SP Metadata to IdP'):

          sess.add_sp_metadata(idpname, sp1name)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testgssapi: Access first SP Protected Area ...", end=' ')

-     try:

+     with TC.case('Access first SP Protected Area'):

          page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/')

          page.expected_value('text()', 'WORKS!')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testgssapi: Access second SP Protected Area ...", end=' ')

-     try:

+     with TC.case('Access second SP Protected Area'):

          page = sess.fetch_page(idpname, 'https://127.0.0.11:45082/sp/')

          page.expected_value('text()', 'WORKS!')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

file modified
+32 -126
@@ -1,14 +1,12 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING

- 

- from __future__ import print_function

+ # Copyright (C) 2015-2017 Ipsilon project Contributors, for license see COPYING

  

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import os

  import pwd

- import sys

  from string import Template

  

  
@@ -143,21 +141,21 @@ 

          super(IpsilonTest, self).__init__('testlogout', __file__)

  

      def setup_servers(self, env=None):

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          name = 'idp1'

          addr = '127.0.0.10'

          port = '45080'

          idp = self.generate_profile(idp_g, idp_a, name, addr, port)

          conf = self.setup_idp_server(idp, name, addr, port, env)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          self.start_http_server(conf, env)

  

          for spdata in splist:

              nameid = spdata['nameid']

              addr = spdata['addr']

              port = spdata['port']

-             print("Installing SP server %s" % nameid)

+             self.setup_step("Installing SP server %s" % nameid)

  

              # Configure sp3 and sp4 for only HTTP Redirect to test

              # that a mix of SOAP and HTTP Redirect will play nice
@@ -173,7 +171,7 @@ 

              conf = self.setup_sp_server(sp_prof, nameid, addr, str(port), env)

              fixup_sp_httpd(os.path.dirname(conf))

  

-             print("Starting SP's httpd server")

+             self.setup_step("Starting SP's httpd server")

              self.start_http_server(conf, env)

  

  
@@ -189,188 +187,96 @@ 

          spurl = 'https://%s:%s' % (sp['addr'], sp['port'])

          sess.add_server(spname, spurl)

  

-     print("testlogout: Authenticate to IDP ...", end=' ')

-     try:

+     with TC.case('Authenticate to IdP'):

          sess.auth_to_idp(idpname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

      for sp in splist:

          spname = sp['nameid']

-         print("testlogout: Add SP Metadata for %s to IDP ..." % spname,

-               end=' ')

-         try:

+         with TC.case('Add SP Metadata for %s to IdP' % spname):

              sess.add_sp_metadata(idpname, spname)

-         except Exception as e:  # pylint: disable=broad-except

-             print(" ERROR: %s" % repr(e), file=sys.stderr)

-             sys.exit(1)

-         print(" SUCCESS")

  

-     print("testlogout: Logout without logging into SP ...", end=' ')

-     try:

+     with TC.case('Logout without logging into SP'):

          page = sess.fetch_page(idpname, '%s/%s?%s' % (

              'https://127.0.0.11:45081', 'saml2/logout',

              'ReturnTo=https://127.0.0.11:45081/open/logged_out.html'))

          page.expected_value('text()', 'Logged out')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testlogout: Access SP Protected Area ...", end=' ')

-     try:

+     with TC.case('Access SP Protected Area'):

          page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/')

          page.expected_value('text()', 'WORKS!')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testlogout: Logout from SP ...", end=' ')

-     try:

+     with TC.case('Logout from SP'):

          page = sess.fetch_page(idpname, '%s/%s?%s' % (

              'https://127.0.0.11:45081', 'saml2/logout',

              'ReturnTo=https://127.0.0.11:45081/open/logged_out.html'))

          page.expected_value('text()', 'Logged out')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testlogout: Try logout again ...", end=' ')

-     try:

+     with TC.case('Try logout again'):

          page = sess.fetch_page(idpname, '%s/%s?%s' % (

              'https://127.0.0.11:45081', 'saml2/logout',

              'ReturnTo=https://127.0.0.11:45081/open/logged_out.html'))

          page.expected_value('text()', 'Logged out')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testlogout: Ensure logout ...", end=' ')

-     try:

+     with TC.case('Ensure logout'):

          ensure_logout(sess, idpname, 'https://127.0.0.11:45081/sp/')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

      # Test logout from each of the SP's in the list to ensure that the

      # order of logout doesn't matter.

      for sporder in splist:

-         print("testlogout: Access SP Protected Area of each SP ...", end=' ')

-         for sp in splist:

-             spname = sp['nameid']

-             spurl = 'https://%s:%s/sp/' % (sp['addr'], sp['port'])

-             try:

+         with TC.case('Access SP PRotected Area of all SPs'):

+             for sp in splist:

+                 spname = sp['nameid']

+                 spurl = 'https://%s:%s/sp/' % (sp['addr'], sp['port'])

                  page = sess.fetch_page(idpname, spurl)

                  page.expected_value('text()', 'WORKS!')

-             except ValueError as e:

-                 print(" ERROR: %s" % repr(e), file=sys.stderr)

-                 sys.exit(1)

-         print(" SUCCESS")

- 

-         print("testlogout: Initiate logout from %s ..." % sporder['nameid'],

-               end=' ')

-         try:

+ 

+         with TC.case('Initiate logout from %s' % sporder['nameid']):

              logouturl = 'https://%s:%s' % (sp['addr'], sp['port'])

              page = sess.fetch_page(idpname, '%s/%s?%s' % (

                  logouturl, 'saml2/logout',

                  'ReturnTo=https://127.0.0.11:45081/open/logged_out.html'))

              page.expected_value('text()', 'Logged out')

-         except ValueError as e:

-             print(" ERROR: %s" % repr(e), file=sys.stderr)

-             sys.exit(1)

-         print(" SUCCESS")

- 

-         print("testlogout: Ensure logout of each SP ...", end=' ')

-         for sp in splist:

-             spname = sp['nameid']

-             spurl = 'https://%s:%s/sp/' % (sp['addr'], sp['port'])

-             try:

+ 

+         with TC.case('Ensure logout of each SP'):

+             for sp in splist:

+                 spname = sp['nameid']

+                 spurl = 'https://%s:%s/sp/' % (sp['addr'], sp['port'])

                  ensure_logout(sess, idpname, spurl)

-             except ValueError as e:

-                 print(" ERROR: %s" % repr(e), file=sys.stderr)

-                 sys.exit(1)

-         print(" SUCCESS")

  

      # Test IdP-initiated logout

-     print("testlogout: Access SP Protected Area of SP1...", end=' ')

-     try:

+     with TC.case('Access SP Protected area of SP1'):

          page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/')

          page.expected_value('text()', 'WORKS!')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testlogout: Access SP Protected Area of SP2...", end=' ')

-     try:

+     with TC.case('Access SP Protected Area of SP2'):

          page = sess.fetch_page(idpname, 'https://127.0.0.11:45082/sp/')

          page.expected_value('text()', 'WORKS!')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testlogout: Access the IdP...", end=' ')

-     try:

+     with TC.case('Access the IdP'):

          page = sess.fetch_page(idpname,

                                 'https://127.0.0.10:45080/%s' % idpname)

          page.expected_value('//div[@id="welcome"]/p/text()',

                              'Welcome %s!' % user)

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testlogout: IdP-initiated logout ...", end=' ')

-     try:

+     with TC.case('IdP-initiated logout'):

          page = sess.fetch_page(idpname,

                                 'https://127.0.0.10:45080/%s/logout' % idpname)

          page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testlogout: Ensure logout of SP1 ...", end=' ')

-     try:

+     with TC.case('Ensure logout of SP1'):

          ensure_logout(sess, idpname, 'https://127.0.0.11:45081/sp/')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testlogout: Ensure logout of SP2 ...", end=' ')

-     try:

+     with TC.case('Ensure logout of SP2'):

          ensure_logout(sess, idpname, 'https://127.0.0.11:45082/sp/')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testlogout: Access the IdP...", end=' ')

-     try:

+     with TC.case('Access the IdP'):

          page = sess.fetch_page(idpname,

                                 'https://127.0.0.10:45080/%s/login' % idpname)

          page.expected_value('//div[@id="welcome"]/p/text()',

                              'Welcome %s!' % user)

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testlogout: IdP-initiated logout with no SP sessions...", end=' ')

-     try:

+     with TC.case('IdP-initiated logout with no SP sessions'):

          page = sess.fetch_page(idpname,

                                 'https://127.0.0.10:45080/%s/logout' % idpname)

          page.expected_value('//div[@id="logout"]/p//text()',

                              'Successfully logged out.')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

file modified
+44 -169
@@ -1,14 +1,12 @@ 

  #!/usr/bin/python

  # -*- coding: utf-8 -*-

  #

- # Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING

- 

- from __future__ import print_function

+ # Copyright (C) 2015-2017 Ipsilon project Contributors, for license see COPYING

  

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import os

- import sys

  import pwd

  from string import Template

  
@@ -145,14 +143,14 @@ 

          super(IpsilonTest, self).__init__('testmapping', __file__)

  

      def setup_servers(self, env=None):

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          name = 'idp1'

          addr = '127.0.0.10'

          port = '45080'

          idp = self.generate_profile(idp_g, idp_a, name, addr, port)

          conf = self.setup_idp_server(idp, name, addr, port, env)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          self.start_http_server(conf, env)

  

          for spdata in sp_list:
@@ -160,12 +158,12 @@ 

              port = spdata['port']

              name = spdata['name']

  

-             print("Installing SP server %s" % name)

+             self.setup_step("Installing SP server %s" % name)

              sp_prof = self.generate_profile(sp_g, sp_a, name, addr, str(port))

              conf = self.setup_sp_server(sp_prof, name, addr, str(port), env)

              fixup_sp_httpd(os.path.dirname(conf))

  

-             print("Starting SP's httpd server")

+             self.setup_step("Starting SP's httpd server")

              self.start_http_server(conf, env)

  

  
@@ -182,24 +180,13 @@ 

      sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon')

      sess.add_server(sp['name'], spurl)

  

-     print("testmapping: Authenticate to IDP ...", end=' ')

-     try:

+     with TC.case('Authenticate to IdP'):

          sess.auth_to_idp(idpname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testmapping: Add SP Metadata to IDP ...", end=' ')

-     try:

+     with TC.case('Add SP Metadata to IdP'):

          sess.add_sp_metadata(idpname, sp['name'])

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     try:

-         print("testmapping: Test default mapping and attrs ...", end=' ')

+     with TC.case('Test default mapping and attrs'):

          expect = {

              'NAME_ID': user,

              'fullname': 'Test User %s' % user,
@@ -209,25 +196,14 @@ 

              'groups': user,

          }

          check_info_plugin(sess, idpname, spurl, expect)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testmapping: Set default global mapping ...", end=' ')

-     try:

+     with TC.case('Set default global mapping'):

          sess.set_attributes_and_mapping(

              idpname,

              [['*', '*'],

               ['fullname', 'namefull']])

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

- 

-     try:

-         print("testmapping: Test global mapping ...", end=' ')

+ 

+     with TC.case('Test global mapping'):

          expect = {

              'fullname': 'Test User %s' % user,

              'namefull': 'Test User %s' % user,
@@ -237,52 +213,28 @@ 

              'groups': user

          }

          check_info_plugin(sess, idpname, spurl, expect)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

- 

-     print("testmapping: Set default allowed attributes ...", end=' ')

-     try:

+ 

+     with TC.case('Set default allowed attributes'):

          sess.set_attributes_and_mapping(

              idpname,

              [],

              ['namefull', 'givenname', 'surname'])

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

- 

-     try:

-         print("testmapping: Test global allowed attributes ...", end=' ')

+ 

+     with TC.case('Test global allowed attributes'):

          expect = {

              'namefull': 'Test User %s' % user,

              'surname': user,

              'givenname': u'Test User 一',

          }

          check_info_plugin(sess, idpname, spurl, expect)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

- 

-     print("testmapping: Set SP allowed attributes ...", end=' ')

-     try:

+ 

+     with TC.case('Set SP allowed attributes'):

          sess.set_attributes_and_mapping(

              idpname, [['*', '*']],

              ['wholename', 'givenname', 'surname',

               'email', 'fullname'], sp['name'])

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

- 

-     print("testmapping: Test SP allowed atributes ...", end=' ')

-     try:

+ 

+     with TC.case('Test SP allowed attributes'):

          expect = {

              'fullname': 'Test User %s' % user,

              'surname': user,
@@ -290,14 +242,8 @@ 

              'email': '%s@example.com' % user,

          }

          check_info_plugin(sess, idpname, spurl, expect)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

- 

-     print("testmapping: Set SP attribute mapping ...", end=' ')

-     try:

+ 

+     with TC.case('Set SP attribute mapping'):

          sess.set_attributes_and_mapping(

              idpname,

              [['*', '*'],
@@ -306,14 +252,8 @@ 

               'surname',

               'email', 'fullname'],

              sp['name'])

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

- 

-     print("testmapping: Test SP attribute mapping ...", end=' ')

-     try:

+ 

+     with TC.case('Test SP attribute mapping'):

          expect = {

              'wholename': 'Test User %s' % user,

              'fullname': 'Test User %s' % user,
@@ -322,14 +262,8 @@ 

              'email': '%s@example.com' % user,

          }

          check_info_plugin(sess, idpname, spurl, expect)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

- 

-     print("testmapping: Set SP URL attribute mapping ...", end=' ')

-     try:

+ 

+     with TC.case('Set SP URL attribute mapping'):

          sess.set_attributes_and_mapping(

              idpname,

              [['*', '*'],
@@ -342,14 +276,8 @@ 

               'email',

               'fullname'],

              sp['name'])

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

- 

-     print("testmapping: Test SP URL attribute mapping ...", end=' ')

-     try:

+ 

+     with TC.case('Test SP URL attribute mapping'):

          expect = {

              'http://localhost/SAML/Name': 'Test User %s' % user,

              'https://localhost/SAML/Name': 'Test User %s' % user,
@@ -359,42 +287,24 @@ 

              'email': '%s@example.com' % user,

          }

          check_info_plugin(sess, idpname, spurl, expect)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

- 

-     print("testmapping: Set SP explicit mapping ...", end=' ')

-     try:

+ 

+     with TC.case('Set SP explicit mapping'):

          sess.set_attributes_and_mapping(

              idpname,

              [['fullname', 'wholename'],

               ['email', 'email']],

              ['wholename', 'email'],

              sp['name'])

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

- 

-     print("testmapping: Test SP explicit mapping ...", end=' ')

-     try:

+ 

+     with TC.case('Test SP explicit mapping'):

          expect = {

              'wholename': 'Test User %s' % user,

              'email': '%s@example.com' % user,

              'NAME_ID': user,

          }

          check_info_plugin(sess, idpname, spurl, expect)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

- 

-     print("testmapping: Set SP username mapping ...", end=' ')

-     try:

+ 

+     with TC.case('Set SP username mapping'):

          sess.set_attributes_and_mapping(

              idpname,

              [['*', '*'],
@@ -404,14 +314,8 @@ 

               'surname',

               'email', 'fullname'],

              sp['name'])

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

- 

-     print("testmapping: Test SP username mapping ...", end=' ')

-     try:

+ 

+     with TC.case('Test SP username mapping'):

          expect = {

              'wholename': 'Test User %s' % user,

              'fullname': 'Test User %s' % user,
@@ -421,26 +325,14 @@ 

              'NAME_ID': '%s@example.com' % user,

          }

          check_info_plugin(sess, idpname, spurl, expect)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

- 

-     print("testmapping: Drop SP attribute mapping ...", end=' ')

-     try:

+ 

+     with TC.case('Drop SP attribute mapping'):

          sess.set_attributes_and_mapping(

              idpname, [],

              ['givenname', 'surname', 'email',

               'fullname'], sp['name'])

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

- 

-     print("testmapping: Test SP attr mapping with default allowed...", end=' ')

-     try:

+ 

+     with TC.case('Test SP attribute mapping with default allowed'):

          expect = {

              'fullname': 'Test User %s' % user,

              'surname': user,
@@ -448,31 +340,14 @@ 

              'email': '%s@example.com' % user,

          }

          check_info_plugin(sess, idpname, spurl, expect)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

- 

-     print("testmapping: Drop SP allowed attributes ...", end=' ')

-     try:

+ 

+     with TC.case('Drop SP allowed attributes'):

          sess.set_attributes_and_mapping(idpname, [], [], sp['name'])

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

- 

-     print("testmapping: Test mapping, should be back to global...", end=' ')

-     try:

+ 

+     with TC.case('Test mapping, should be back to global'):

          expect = {

              'namefull': 'Test User %s' % user,

              'surname': user,

              'givenname': u'Test User 一',

          }

          check_info_plugin(sess, idpname, spurl, expect)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

file modified
+52 -132
@@ -1,17 +1,15 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING

- 

- from __future__ import print_function

+ # Copyright (C) 2015-2017 Ipsilon project Contributors, for license see COPYING

  

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

  from helpers.common import WRAP_HOSTNAME  # pylint: disable=relative-import

  from helpers.common import TESTREALM  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  from ipsilon.tools.saml2metadata import SAML2_NAMEID_MAP

  import os

  import pwd

- import sys

  import re

  from string import Template

  
@@ -120,23 +118,23 @@ 

      def setup_servers(self, env=None):

          os.mkdir("%s/ccaches" % self.testdir)

  

-         print("Installing KDC server")

+         self.setup_step("Installing KDC server")

          kdcenv = self.setup_kdc(env)

  

-         print("Creating principals and keytabs")

+         self.setup_step("Creating principals and keytabs")

          self.setup_keys(kdcenv)

  

-         print("Getting a TGT")

+         self.setup_step("Getting a TGT")

          self.kinit_keytab(kdcenv)

  

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          name = 'idp1'

          addr = WRAP_HOSTNAME

          port = '45080'

          idp = self.generate_profile(idp_g, idp_a, name, addr, port)

          conf = self.setup_idp_server(idp, name, addr, port, env)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          env.update(kdcenv)

          self.start_http_server(conf, env)

  
@@ -144,14 +142,14 @@ 

              nameid = spdata['nameid']

              addr = spdata['addr']

              port = spdata['port']

-             print("Installing SP server %s" % nameid)

+             self.setup_step("Installing SP server %s" % nameid)

              sp_prof = self.generate_profile(

                  sp_g, sp_a, nameid, addr, str(port), nameid

              )

              conf = self.setup_sp_server(sp_prof, nameid, addr, str(port), env)

              fixup_sp_httpd(os.path.dirname(conf))

  

-             print("Starting SP's httpd server")

+             self.setup_step("Starting SP's httpd server")

              self.start_http_server(conf, env)

  

  
@@ -161,15 +159,15 @@ 

      user = pwd.getpwuid(os.getuid())[0]

  

      expected = {

-         'x509':        False,   # not supported

-         'transient':   True,

-         'persistent':  True,

-         'windows':     False,   # not supported

-         'encrypted':   False,   # not supported

-         'kerberos':    True,

-         'email':       True,

-         'unspecified': True,

-         'entity':      False,   # not supported

+         'x509':        True,   # not supported

+         'transient':   False,

+         'persistent':  False,

+         'windows':     True,   # not supported

+         'encrypted':   True,   # not supported

+         'kerberos':    False,

+         'email':       False,

+         'unspecified': False,

+         'entity':      True,   # not supported

      }

  

      expected_re = {
@@ -204,70 +202,41 @@ 

                          'ipsilon')

          sess.add_server(spname, spurl)

  

-         print("")

-         print("testnameid: Testing NameID format %s ..." % spname)

+         TC.info('Testing NameID format %s' % spname)

  

          if spname == 'kerberos':

              krb = True

  

-         print("testnameid: Authenticate to IDP ...", end=' ')

-         try:

+         with TC.case('Authenticate to IdP'):

              sess.auth_to_idp(idpname, krb=krb)

-         except Exception as e:  # pylint: disable=broad-except

-             print(" ERROR: %s" % repr(e), file=sys.stderr)

-             sys.exit(1)

-         print(" SUCCESS")

  

-         print("testnameid: Add SP Metadata to IDP ...", end=' ')

-         try:

+         with TC.case('Add SP Metadata to IdP'):

              sess.add_sp_metadata(idpname, spname)

-         except Exception as e:  # pylint: disable=broad-except

-             print(" ERROR: %s" % repr(e), file=sys.stderr)

-             sys.exit(1)

-         print(" SUCCESS")

  

-         print("testnameid: Set supported Name ID formats ...", end=' ')

-         try:

+         with TC.case('Set supported Name ID formats'):

              sess.set_sp_default_nameids(idpname, spname, [spname])

-         except Exception as e:  # pylint: disable=broad-except

-             print(" ERROR: %s" % repr(e), file=sys.stderr)

-             sys.exit(1)

-         print(" SUCCESS")

  

-         print("testnameid: Access SP Protected Area ...", end=' ')

-         try:

+         with TC.case('Access SP Protected Area',

+                      should_fail=bool(expected[spname])):

              page = sess.fetch_page(idpname, '%s/sp/' % spurl)

              if not re.match(expected_re[spname], page.text):

                  raise ValueError(

                      'page did not contain expression %s' %

                      expected_re[spname]

                  )

-         except ValueError as e:

-             if expected[spname]:

-                 print(" ERROR: %s" % repr(e), file=sys.stderr)

-                 sys.exit(1)

-             print(" OK, EXPECTED TO FAIL")

-         else:

-             print(" SUCCESS")

- 

-         print("testnameid: Try authentication failure ...", end=' ')

+ 

          newsess = HttpSessions()

          newsess.add_server(idpname, 'https://%s:45080' % WRAP_HOSTNAME,

                             user, 'wrong')

-         try:

+         with TC.case('Try authentication failure', should_fail=True):

              newsess.auth_to_idp(idpname)

-             print(" ERROR: Authentication should have failed", file=sys.stderr)

-             sys.exit(1)

-         except Exception as e:  # pylint: disable=broad-except

-             print(" SUCCESS")

  

      # Ensure that transient names change with each authentication

      sp = get_sp_by_nameid(sp_list, 'transient')

      spname = sp['nameid']

      spurl = 'https://%s:%s' % (sp['addr'], sp['port'])

  

-     print("")

-     print("testnameid: Testing NameID format %s ..." % spname)

+     TC.info('Testing NameID format %s' % spname)

  

      ids = []

      for i in xrange(4):
@@ -275,58 +244,33 @@ 

          sess.add_server(idpname, 'https://%s:45080' % WRAP_HOSTNAME,

                          user, 'ipsilon')

          sess.add_server(spname, spurl)

-         print("testnameid: Authenticate to IDP ...", end=' ')

-         try:

+         with TC.case('Authenticate to IdP'):

              sess.auth_to_idp(idpname)

-         except Exception as e:  # pylint: disable=broad-except

-             print(" ERROR: %s" % repr(e), file=sys.stderr)

-             sys.exit(1)

-         else:

-             print(" SUCCESS")

- 

-         print("testnameid: Access SP ...", end=' ')

-         try:

+ 

+         with TC.case('Acess SP'):

              page = sess.fetch_page(idpname, '%s/sp/' % spurl)

              t1 = page.text

-         except ValueError as e:

-             print(" ERROR: %s" % repr(e), file=sys.stderr)

-             sys.exit(1)

-         else:

-             print(" SUCCESS")

- 

-         print("testnameid: Access SP again ...", end=' ')

-         try:

+ 

+         with TC.case('Access SP again'):

              page = sess.fetch_page(idpname, '%s/sp/' % spurl)

              t2 = page.text

-         except ValueError as e:

-             print(" ERROR: %s" % repr(e), file=sys.stderr)

-             sys.exit(1)

-         else:

-             print(" SUCCESS")

- 

-         print("testnameid: Ensure ID is consistent between requests ...",

-               end=' ')

-         if t1 != t2:

-             print(" ERROR: New ID between reqeusts", file=sys.stderr)

-         else:

-             print(" SUCCESS")

+ 

+         with TC.case('Ensure ID is consistent between requests'):

+             if t1 != t2:

+                 raise ValueError('Same ID between requests')

  

          ids.append(t1)

  

-     print("testnameid: Ensure uniqueness across sessions ...", end=' ')

-     if len(ids) != len(set(ids)):

-         print(" ERROR: IDs are not unique between sessions", file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

+     with TC.case('Ensure uniqueness across sessions'):

+         if len(ids) != len(set(ids)):

+             raise ValueError('IDs are not unique between sessions')

  

      # Ensure that persistent names remain the same with each authentication

      sp = get_sp_by_nameid(sp_list, 'persistent')

      spname = sp['nameid']

      spurl = 'https://%s:%s' % (sp['addr'], sp['port'])

  

-     print("")

-     print("testnameid: Testing NameID format %s ..." % spname)

+     TC.info('Testing NameID format %s' % spname)

  

      ids = []

      for i in xrange(4):
@@ -334,47 +278,23 @@ 

          sess.add_server(idpname, 'https://%s:45080' % WRAP_HOSTNAME,

                          user, 'ipsilon')

          sess.add_server(spname, spurl)

-         print("testnameid: Authenticate to IDP ...", end=' ')

-         try:

+         with TC.case('Authenticate to IdP'):

              sess.auth_to_idp(idpname)

-         except Exception as e:  # pylint: disable=broad-except

-             print(" ERROR: %s" % repr(e), file=sys.stderr)

-             sys.exit(1)

-         else:

-             print(" SUCCESS")

- 

-         print("testnameid: Access SP ...", end=' ')

-         try:

+ 

+         with TC.case('Access SP'):

              page = sess.fetch_page(idpname, '%s/sp/' % spurl)

              t1 = page.text

-         except ValueError as e:

-             print(" ERROR: %s" % repr(e), file=sys.stderr)

-             sys.exit(1)

-         else:

-             print(" SUCCESS")

- 

-         print("testnameid: Access SP again ...", end=' ')

-         try:

+ 

+         with TC.case('Access SP again'):

              page = sess.fetch_page(idpname, '%s/sp/' % spurl)

              t2 = page.text

-         except ValueError as e:

-             print(" ERROR: %s" % repr(e), file=sys.stderr)

-             sys.exit(1)

-         else:

-             print(" SUCCESS")

- 

-         print("testnameid: Ensure ID is consistent between requests ...",

-               end=' ')

-         if t1 != t2:

-             print(" ERROR: New ID between reqeusts", file=sys.stderr)

-         else:

-             print(" SUCCESS")

+ 

+         with TC.case('Ensure ID is consistent between requests'):

+             if t1 != t2:

+                 raise ValueError('New ID between requests')

  

          ids.append(t1)

  

-     print("testnameid: Ensure same ID across sessions ...", end=' ')

-     if len(set(ids)) != 1:

-         print(" ERROR: IDs are not the same between sessions", file=sys.stderr)

-         sys.exit(1)

-     else:

-         print(" SUCCESS")

+     with TC.case('Ensure same ID across sessions'):

+         if len(set(ids)) != 1:

+             raise ValueError('IDs are not the same between sessions')

file modified
+28 -98
@@ -1,14 +1,12 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING

- 

- from __future__ import print_function

+ # Copyright (C) 2015-2017 Ipsilon project Contributors, for license see COPYING

  

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import os

  import pwd

- import sys

  from string import Template

  

  
@@ -101,17 +99,17 @@ 

          super(IpsilonTest, self).__init__('testrest', __file__)

  

      def setup_servers(self, env=None):

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          name = 'idp1'

          addr = '127.0.0.10'

          port = '45080'

          idp = self.generate_profile(idp_g, idp_a, name, addr, port)

          conf = self.setup_idp_server(idp, name, addr, port, env)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing SP server")

+         self.setup_step("Installing SP server")

          name = 'sp1'

          addr = '127.0.0.11'

          port = '45081'
@@ -119,10 +117,10 @@ 

          conf = self.setup_sp_server(sp, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf), name)

  

-         print("Starting SP's httpd server")

+         self.setup_step("Starting SP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing second SP server")

+         self.setup_step("Installing second SP server")

          name = 'sp2-test.example.com'

          addr = '127.0.0.10'

          port = '45082'
@@ -130,10 +128,10 @@ 

          conf = self.setup_sp_server(sp2, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf), name)

  

-         print("Starting SP's httpd server")

+         self.setup_step("Starting SP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing third SP server")

+         self.setup_step("Installing third SP server")

          name = 'sp3_invalid'

          addr = '127.0.0.10'

          port = '45083'
@@ -141,7 +139,7 @@ 

          conf = self.setup_sp_server(sp3, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf), name)

  

-         print("Starting SP's httpd server")

+         self.setup_step("Starting SP's httpd server")

          self.start_http_server(conf, env)

  

  
@@ -159,36 +157,20 @@ 

      sess.add_server(sp2name, 'https://127.0.0.10:45082')

      sess.add_server(sp3name, 'https://127.0.0.10:45083')

  

-     print("testrest: Authenticate to IDP ...", end=' ')

-     try:

+     with TC.case('Authenticate to IdP'):

          sess.auth_to_idp(idpname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testrest: List initial Service Providers via REST ...", end=' ')

-     try:

+     with TC.case('List initial Service Providers via REST'):

          result = sess.get_rest_sp(idpname)

          if len(result['result']) != 0:

              raise ValueError(

                  'Expected no SP and got %d' % len(result['result'])

              )

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testrest: Add SP Metadata to IDP via admin ...", end=' ')

-     try:

+     with TC.case('Add SP Metadata to IdP via admin'):

          sess.add_sp_metadata(idpname, spname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testrest: List Service Providers via REST ...", end=' ')

-     try:

+     with TC.case('List Service Providers via REST'):

          result = sess.get_rest_sp(idpname)

          if len(result['result']) != 1:

              raise ValueError(
@@ -199,33 +181,18 @@ 

                  'Expected %s and got %s' %

                  (spname, result['result'][0].get('provider'))

              )

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testrest: Add Service Provider via REST ...", end=' ')

-     try:

+     with TC.case('Add Service Provider via REST'):

          sess.add_sp_metadata(idpname, sp2name, rest=True)

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testrest: List Service Providers via REST ...", end=' ')

-     try:

+     with TC.case('List Service Providers via REST'):

          result = sess.get_rest_sp(idpname)

          if len(result['result']) != 2:

              raise ValueError(

                  'Expected 2 SPs and got %d' % len(result['result'])

              )

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testrest: List Specific Service Providers via REST ...", end=' ')

-     try:

+     with TC.case('List specific Service Providers via REST'):

          result = sess.get_rest_sp(idpname, spname)

          if len(result['result']) != 1:

              raise ValueError(
@@ -248,61 +215,24 @@ 

                  'Expected %s and got %s' %

                  (spname, result['result'][0].get('provider'))

              )

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

      # Now for some negative testing

  

-     print("testrest: Add illegally named Service Provider via REST ...",

-           end=' ')

-     try:

+     with TC.case('Add illegally named Service Provider via REST',

+                  should_fail='[400]'):

          sess.add_sp_metadata(idpname, sp3name, rest=True)

-     except ValueError as e:

-         print(" SUCCESS")

-     else:

-         print("ERROR: "

-               "Adding SP with invalid name should have failed and it didn't",

-               file=sys.stderr)

-         sys.exit(1)

- 

-     print("testrest: Fetch non-existent REST endpoint ...", end=' ')

-     try:

+ 

+     with TC.case('Fetch non-existent REST endpoint',

+                  should_fail='(501)'):

          result = sess.fetch_rest_page(

              idpname,

              '/%s/rest/providers/saml2/notfound' % idpname

          )

-     except ValueError as e:

-         if '(501)' not in e.message:

-             print(" ERROR: %s" % repr(e), file=sys.stderr)

-             sys.exit(1)

-         else:

-             print(" SUCCESS")

-     else:

-         print("ERROR: should have returned a 404", file=sys.stderr)

-         sys.exit(1)

- 

-     print("testrest: Fetch non-existent SP via REST ...", end=' ')

-     try:

+ 

+     with TC.case('Fetch non-existent SP via REST',

+                  should_fail='(404)'):

          result = sess.get_rest_sp(idpname, 'foo')

-     except ValueError as e:

-         if '(404)' not in e.message:

-             print(" ERROR: %s" % repr(e), file=sys.stderr)

-             sys.exit(1)

-         else:

-             print(" SUCCESS")

-     else:

-         print("ERROR: should have returned a 404", file=sys.stderr)

-         sys.exit(1)

- 

-     print("testrest: Re-add Service Provider via REST ...", end=' ')

-     try:

+ 

+     with TC.case('Re-add Service Provider via REST',

+                  should_fail='[400]'):

          sess.add_sp_metadata(idpname, sp2name, rest=True)

-     except ValueError as e:

-         print(" SUCCESS")

-     else:

-         print("ERROR: "

-               "Adding duplicate SP should have failed and it didn't",

-               file=sys.stderr)

-         sys.exit(1)

file modified
+19 -41
@@ -1,14 +1,12 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING

- 

- from __future__ import print_function

+ # Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING

  

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import os

  import pwd

- import sys

  from string import Template

  

  idp_g = {'TEMPLATES': '${TESTDIR}/templates/install',
@@ -105,17 +103,17 @@ 

          super(IpsilonTest, self).__init__('testroot', __file__)

  

      def setup_servers(self, env=None):

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          name = 'root'

          addr = '127.0.0.10'

          port = '45080'

          idp = self.generate_profile(idp_g, idp_a, name, addr, port)

          conf = self.setup_idp_server(idp, name, addr, port, env)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing first SP server")

+         self.setup_step("Installing first SP server")

          name = 'sp1'

          addr = '127.0.0.11'

          port = '45081'
@@ -123,10 +121,10 @@ 

          conf = self.setup_sp_server(sp, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting first SP's httpd server")

+         self.setup_step("Starting first SP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing second SP server")

+         self.setup_step("Installing second SP server")

          name = 'sp2-test.example.com'

          addr = '127.0.0.11'

          port = '45082'
@@ -137,7 +135,7 @@ 

          os.remove(os.path.dirname(sp) + '/pw.txt')

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting second SP's httpd server")

+         self.setup_step("Starting second SP's httpd server")

          self.start_http_server(conf, env)

  

  
@@ -153,46 +151,26 @@ 

      sess.add_server(sp1name, 'https://127.0.0.11:45081')

      sess.add_server(sp2name, 'https://127.0.0.11:45082')

  

-     print("testroot: Authenticate to IDP ...", end=' ')

-     try:

+     with TC.case('Authenticate to IdP'):

          sess.auth_to_idp(idpname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testroot: Add first SP Metadata to IDP ...", end=' ')

-     try:

+     with TC.case('Add first SP Metadata to IdP'):

          sess.add_sp_metadata(idpname, sp1name)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testroot: Access first SP Protected Area ...", end=' ')

-     try:

+     with TC.case('Access first SP Protected Area'):

          page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/')

          page.expected_value('text()', 'WORKS!')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testroot: Access second SP Protected Area ...", end=' ')

-     try:

+     with TC.case('Access second SP Protected Area'):

          page = sess.fetch_page(idpname, 'https://127.0.0.11:45082/sp/')

          page.expected_value('text()', 'WORKS!')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("testroot: Try authentication failure ...", end=' ')

      newsess = HttpSessions()

      newsess.add_server(idpname, 'https://127.0.0.10:45080', user, 'wrong')

-     try:

-         newsess.auth_to_idp(idpname)

-         print(" ERROR: Authentication should have failed", file=sys.stderr)

-         sys.exit(1)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" SUCCESS")

+     with TC.case('Try authentication failure'):

+         try:

+             newsess.auth_to_idp(idpname)

+         except Exception as e:  # pylint: disable=broad-except

+             pass

+         else:

+             raise ValueError('Authentication should have failed')

file modified
+120 -35
@@ -1,6 +1,6 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING

+ # Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING

  

  from __future__ import print_function

  
@@ -8,33 +8,51 @@ 

  import pkg_resources  # pylint: disable=unused-import

  

  import argparse

- import inspect

  from ipsilon.util import plugin

  import os

  import sys

  import subprocess

- import time

- import traceback

  from helpers.common import WRAP_HOSTNAME  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  

  

  logger = None

  

  

- class Tests(object):

+ VERBOSE_SHOWTESTS = 1

+ VERBOSE_SHOWCASES = 2

+ VERBOSE_SHOWOUTPUT = 3

  

-     def __init__(self):

-         p = plugin.Plugins()

-         (pathname, dummy) = os.path.split(inspect.getfile(Tests))

-         self.plugins = p.get_plugins(pathname, 'IpsilonTest')

+ 

+ TEST_RESULT_SUCCESS = 0

+ TEST_RESULT_SKIP = 1

+ TEST_RESULT_FAIL = 2

+ TEST_RESULT_EXCEPTION = 3

+ TEST_RESULT_SETUP_FAILED = 4

+ 

+ 

+ def get_tests():

+     p = plugin.Plugins()

+     (pathname, _) = os.path.split(os.path.realpath(__file__))

+     return p.get_plugins(pathname, 'IpsilonTest')

  

  

  def parse_args():

      parser = argparse.ArgumentParser(description='Ipsilon Tests Environment')

+     parser.add_argument('--results-header', default='Test results:',

+                         help='Test results header')

      parser.add_argument('--path', default='%s/testdir' % os.getcwd(),

                          help="Directory in which tests are run")

-     parser.add_argument('--test', default='test1',

-                         help="The test to run")

+     parser.add_argument('--fail-on-first-error', '-x', action='store_true',

+                         help='Abort test run on first test failure')

+     parser.add_argument('--test', action='append', default=None,

+                         help="Add a test to run")

+     parser.add_argument('--list-tests', '-L', action='store_true',

+                         help='List all available tests')

+     parser.add_argument('--no-overview', '-q', action='store_true',

+                         help='Suppress final summary')

+     parser.add_argument('--verbose', '-v', action='count',

+                         help='Increase verbosity')

      parser.add_argument('--wrappers', default='auto',

                          choices=['yes', 'no', 'auto'],

                          help="Run the tests with socket wrappers")
@@ -79,23 +97,17 @@ 

      return wenv

  

  

- if __name__ == '__main__':

- 

-     args = parse_args()

- 

-     tests = Tests()

-     if args['test'] not in tests.plugins:

-         print("Unknown test [%s]" % args['test'], file=sys.stderr)

-         sys.exit(1)

-     test = tests.plugins[args['test']]

+ def run_test(testname, test, args):

+     supported = test.platform_supported()

+     if supported is not None:

+         return (TEST_RESULT_SKIP, supported)

+     if args['verbose'] <= VERBOSE_SHOWOUTPUT:

+         devnull = open(os.devnull, 'w')

+         test.stdout = devnull

+         test.stderr = devnull

  

-     if not test.platform_supported():

-         print("Test %s not supported on platform" % args['test'],

-               file=sys.stderr)

-         sys.exit(0)

- 

-     if not os.path.exists(args['path']):

-         os.makedirs(args['path'])

+     if args['verbose'] >= VERBOSE_SHOWCASES:

+         test.print_cases = True

  

      test.setup_base(args['path'], test)

  
@@ -103,19 +115,92 @@ 

      env['PYTHONPATH'] = test.rootdir

      env['TESTDIR'] = test.testdir

  

+     results = []

+     post_setup = False

+     TC.store_results(results)

      try:

          test.setup_servers(env)

+         post_setup = True

  

-         code = test.run(env)

+         code, results = test.run(env)

          if code:

-             sys.exit(code)

+             return (TEST_RESULT_FAIL, code, results)

      except Exception as e:  # pylint: disable=broad-except

-         print("Error: %s" % repr(e), file=sys.stderr)

-         traceback.print_exc(None, sys.stderr)

-         sys.exit(1)

+         if post_setup:

+             return (TEST_RESULT_EXCEPTION, e, results)

+         else:

+             return (TEST_RESULT_SETUP_FAILED, test.current_setup_step)

      finally:

          test.wait()

  

-     # Wait until all of the sockets are closed by the OS

-     time.sleep(0.5)

-     print("FINISHED")

+     return (TEST_RESULT_SUCCESS, results)

+ 

+ 

+ def result_to_str(result):

+     if result[0] == TEST_RESULT_SUCCESS:

+         return 'Test passed'

+     elif result[0] == TEST_RESULT_SKIP:

+         return 'Test skipped: %s' % result[1]

+     elif result[0] == TEST_RESULT_FAIL:

+         return 'Test failed with code %i' % result[1]

+     elif result[0] == TEST_RESULT_EXCEPTION:

+         return 'Test failed with error: %s' % repr(result[1])

+     elif result[0] == TEST_RESULT_SETUP_FAILED:

+         return 'Test setup failed at step: %s' % result[1]

+     else:

+         return 'Unknown test result %s' % result[0]

+ 

+ 

+ def result_is_fail(result):

+     return result[0] not in (TEST_RESULT_SUCCESS, TEST_RESULT_SKIP)

+ 

+ 

+ def main():

+     args = parse_args()

+ 

+     tests = get_tests()

+     if args['list_tests']:

+         for testname in tests.keys():

+             print(testname)

+         sys.exit(0)

+ 

+     if args['test'] is None:

+         args['test'] = tests.keys()

+     unknown_tests = False

+     for test in args['test']:

+         if test not in tests:

+             unknown_tests = True

+             print("Unknown test [%s]" % test, file=sys.stderr)

+     if unknown_tests:

+         sys.exit(1)

+     args['test'] = set(args['test'])

+ 

+     if not os.path.exists(args['path']):

+         os.makedirs(args['path'])

+ 

+     test_results = {}

+ 

+     for test in args['test']:

+         if args['verbose'] >= VERBOSE_SHOWTESTS:

+             print('Running test %s' % test)

+         result = run_test(test, tests[test], args)

+         test_results[test] = result

+ 

+         if args['verbose'] >= VERBOSE_SHOWTESTS:

+             print(result_to_str(result))

+ 

+         if args['fail_on_first_error'] and result_is_fail(result):

+             break

+ 

+     if not args['no_overview']:

+         print(args['results_header'])

+         for test in test_results:

+             print('{:15s} {}'.format(test, result_to_str(test_results[test])))

+ 

+     if any(result_is_fail(result)

+            for result in test_results.values()):

+         sys.exit(1)

+ 

+ 

+ if __name__ == '__main__':

+     main()

file modified
+8 -20
@@ -1,14 +1,12 @@ 

  #!/usr/bin/python

  #

- # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING

- 

- from __future__ import print_function

+ # Copyright (C) 2014-2017 Ipsilon project Contributors, for license see COPYING

  

  from helpers.common import IpsilonTestBase  # pylint: disable=relative-import

+ from helpers.control import TC  # pylint: disable=relative-import

  from helpers.http import HttpSessions  # pylint: disable=relative-import

  import os

  import pwd

- import sys

  from string import Template

  

  
@@ -78,17 +76,17 @@ 

          super(IpsilonTest, self).__init__('trans', __file__)

  

      def setup_servers(self, env=None):

-         print("Installing IDP server")

+         self.setup_step("Installing IDP server")

          name = 'idp1'

          addr = '127.0.0.10'

          port = '45080'

          idp = self.generate_profile(idp_g, idp_a, name, addr, port)

          conf = self.setup_idp_server(idp, name, addr, port, env)

  

-         print("Starting IDP's httpd server")

+         self.setup_step("Starting IDP's httpd server")

          self.start_http_server(conf, env)

  

-         print("Installing SP server")

+         self.setup_step("Installing SP server")

          name = 'sp1'

          addr = '127.0.0.11'

          port = '45081'
@@ -96,7 +94,7 @@ 

          conf = self.setup_sp_server(sp, name, addr, port, env)

          fixup_sp_httpd(os.path.dirname(conf))

  

-         print("Starting SP's httpd server")

+         self.setup_step("Starting SP's httpd server")

          self.start_http_server(conf, env)

  

  
@@ -106,26 +104,16 @@ 

      spname = 'sp1'

      user = pwd.getpwuid(os.getuid())[0]

  

-     print("trans: Add SP Metadata to IDP ...", end=' ')

-     try:

+     with TC.case('Add SP Metadata to IdP'):

          sess = HttpSessions()

          sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon')

          sess.add_server(spname, 'https://127.0.0.11:45081')

          sess.auth_to_idp(idpname)

          sess.add_sp_metadata(idpname, spname)

-     except Exception as e:  # pylint: disable=broad-except

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

  

-     print("trans: Access SP Protected Area ...", end=' ')

-     try:

+     with TC.case('Access SP Protected Area'):

          sess = HttpSessions()

          sess.add_server(idpname, 'https://127.0.0.10:45080', user, 'ipsilon')

          sess.add_server(spname, 'https://127.0.0.11:45081')

          page = sess.fetch_page(idpname, 'https://127.0.0.11:45081/sp/')

          page.expected_value('text()', 'WORKS!')

-     except ValueError as e:

-         print(" ERROR: %s" % repr(e), file=sys.stderr)

-         sys.exit(1)

-     print(" SUCCESS")

Do note that this branch is based on the test framework rework branch.
As a result, this depends on getting that merged.

I interpreted your comment to mean that it was reasonable for me to review only the 4 newest commits and ignore the others (since they are in another PR presumably?). LGTM on the 4 newest!

Other than the nit, LGTM for last 4.

Thanks. As soon as I get an ack on #280, I'll also get this one merged.

8 new commits added

  • saml2: Do not send a RelayState with POST binding if not requested
  • tests.test1: Add test case for no RelayState
  • tests.helpers: Allow limiting redirects and posting forms
  • Replace mutable default arguments
  • Use automated test detection in Makefile
  • Update tests to use new test framework
  • Rework test framework
  • Allow passing stdout and stderr to tests
6 years ago

Commit cd46749 fixes this pull-request

Pull-Request has been merged by puiterwijk@redhat.com

6 years ago