|
#!/usr/bin/python |
|
# |
|
# Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING |
|
|
import ConfigParser |
|
import io |
|
import os |
|
import pwd |
|
import shutil |
|
import signal |
|
import random |
|
from string import Template |
|
import subprocess |
|
|
from control import TC # pylint: disable=relative-import |
|
|
|
WRAP_HOSTNAME = 'idp.ipsilon.dev' |
|
TESTREALM = 'IPSILON.DEV' |
|
TESTDOMAIN = 'ipsilon.dev' |
|
KDC_DBNAME = 'db.file' |
|
KDC_STASH = 'stash.file' |
|
KDC_PASSWORD = 'ipsilon' |
|
KRB5_CONF_TEMPLATE = ''' |
|
[libdefaults] |
|
default_realm = ${TESTREALM} |
|
dns_lookup_realm = false |
|
dns_lookup_kdc = false |
|
rdns = false |
|
ticket_lifetime = 24h |
|
forwardable = yes |
|
default_ccache_name = FILE://${TESTDIR}/ccaches/krb5_ccache_XXXXXX |
|
udp_preference_limit = 0 |
|
|
[realms] |
|
${TESTREALM} = { |
|
kdc =${WRAP_HOSTNAME}:8888 |
|
} |
|
|
[domain_realm] |
|
.${TESTDOMAIN} = ${TESTREALM} |
|
${TESTDOMAIN} = ${TESTREALM} |
|
|
[dbmodules] |
|
${TESTREALM} = { |
|
database_name = ${KDCDIR}/${KDC_DBNAME} |
|
} |
|
''' |
|
|
KDC_CONF_TEMPLATE = ''' |
|
[kdcdefaults] |
|
kdc_ports = 8888 |
|
kdc_tcp_ports = 8888 |
|
restrict_anonymous_to_tgt = true |
|
|
[realms] |
|
${TESTREALM} = { |
|
master_key_type = aes256-cts |
|
max_life = 7d |
|
max_renewable_life = 14d |
|
acl_file = ${KDCDIR}/kadm5.acl |
|
dict_file = /usr/share/dict/words |
|
default_principal_flags = +preauth |
|
admin_keytab = ${TESTREALM}/kadm5.keytab |
|
key_stash_file = ${KDCDIR}/${KDC_STASH} |
|
} |
|
[logging] |
|
kdc = FILE:${KDCLOG} |
|
''' |
|
|
USER_KTNAME = "user.keytab" |
|
HTTP_KTNAME = "http.keytab" |
|
KEY_TYPE = "aes256-cts-hmac-sha1-96:normal" |
|
|
|
class IpsilonTestBase(object): |
|
|
def __init__(self, name, execname, allow_wrappers=True): |
|
self.name = name |
|
self.execname = execname |
|
self.rootdir = os.getcwd() |
|
self.testdir = None |
|
self.testuser = pwd.getpwuid(os.getuid())[0] |
|
self.processes = [] |
|
self.allow_wrappers = allow_wrappers |
|
self.current_setup_step = None |
|
self.print_cases = False |
|
self.stdout = None |
|
self.stderr = None |
|
|
def platform_supported(self): |
|
"""This return whether the current platform supports this test. |
|
|
This is used for example with specific modules or features that are not |
|
supported on all platforms due to dependency availability. |
|
|
If the platform is supported, it returns None. |
|
Otherwise it returns a string indicating why the platform does not |
|
support the current test. |
|
""" |
|
# Every test defaults to being available on every platform |
|
return None |
|
|
def force_remove(self, op, name, info): |
|
os.chmod(name, 0700) |
|
os.remove(name) |
|
|
def setup_base(self, path, test): |
|
self.testdir = os.path.join(path, test.name) |
|
if os.path.exists(self.testdir): |
|
shutil.rmtree(self.testdir, onerror=self.force_remove) |
|
os.makedirs(self.testdir) |
|
shutil.copytree(os.path.join(self.rootdir, 'templates'), |
|
os.path.join(self.testdir, 'templates')) |
|
os.mkdir(os.path.join(self.testdir, 'etc')) |
|
os.mkdir(os.path.join(self.testdir, 'lib')) |
|
os.mkdir(os.path.join(self.testdir, 'lib', test.name)) |
|
os.mkdir(os.path.join(self.testdir, 'log')) |
|
os.mkdir(os.path.join(self.testdir, 'cache')) |
|
self.setup_ca() |
|
|
def setup_ca(self): |
|
# Prepare the cert stuff for this run |
|
os.mkdir(os.path.join(self.testdir, 'certs')) |
|
cmd = ['openssl', 'req', '-newkey', 'rsa:1024', '-days', '10', |
|
'-x509', '-nodes', '-subj', '/CN=Ipsilon Test CA', |
|
'-keyout', os.path.join(self.testdir, 'certs', 'root.key.pem'), |
|
'-out', os.path.join(self.testdir, 'certs', 'root.cert.pem')] |
|
subprocess.check_call(cmd, |
|
stdout=self.stdout, stderr=self.stderr) |
|
open(os.path.join(self.testdir, 'certs', 'db'), 'w').close() |
|
|
with open(os.path.join(self.testdir, 'certs', 'serial'), 'w') as ser: |
|
ser.write('000b') |
|
|
with open(os.path.join(self.testdir, 'certs', |
|
'openssl.conf'), 'w') as conf: |
|
conf.write("""[ ca ] |
|
default_ca = myca |
|
[ myca ] |
|
database = %(certdir)s/db |
|
serial = %(certdir)s/serial |
|
x509_extensions = myca_extensions |
|
policy = myca_policy |
|
[ myca_policy ] |
|
commonName = supplied |
|
[ alt_names ] |
|
DNS.1 = ${ENV::ADDR} |
|
IP.1 = ${ENV::IPADDR} |
|
[ myca_extensions ] |
|
subjectKeyIdentifier = hash |
|
subjectAltName = @alt_names |
|
basicConstraints = CA:false""" % {'certdir': os.path.join(self.testdir, |
|
'certs')}) |
|
|
def generate_profile(self, global_opts, args_opts, name, addr, port, |
|
nameid='unspecified'): |
|
args_opts['port'] = port |
|
|
newconf = ConfigParser.ConfigParser() |
|
newconf.add_section('globals') |
|
for k in global_opts: |
|
newconf.set('globals', k, global_opts[k]) |
|
newconf.add_section('arguments') |
|
for k in args_opts: |
|
newconf.set('arguments', k, args_opts[k]) |
|
|
profile = io.BytesIO() |
|
newconf.write(profile) |
|
|
t = Template(profile.getvalue()) |
|
text = t.substitute({'NAME': name, 'ADDRESS': addr, 'PORT': port, |
|
'TESTDIR': self.testdir, |
|
'ROOTDIR': self.rootdir, |
|
'NAMEID': nameid, |
|
'HTTP_KTNAME': HTTP_KTNAME, |
|
'TEST_USER': self.testuser}) |
|
|
filename = os.path.join(self.testdir, '%s_profile.cfg' % name) |
|
with open(filename, 'wb') as f: |
|
f.write(text) |
|
|
return filename |
|
|
def setup_http(self, name, addr, port): |
|
httpdir = os.path.join(self.testdir, name) |
|
os.mkdir(httpdir) |
|
os.mkdir(os.path.join(httpdir, 'conf.d')) |
|
os.mkdir(os.path.join(httpdir, 'html')) |
|
os.mkdir(os.path.join(httpdir, 'logs')) |
|
os.symlink('/etc/httpd/modules', os.path.join(httpdir, 'modules')) |
|
|
with open(os.path.join(self.rootdir, 'tests/httpd.conf')) as f: |
|
t = Template(f.read()) |
|
text = t.substitute({'HTTPROOT': httpdir, |
|
'HTTPADDR': addr, |
|
'HTTPPORT': port, |
|
'NAME': name, |
|
'CERTROOT': os.path.join(self.testdir, |
|
'certs')}) |
|
filename = os.path.join(httpdir, 'httpd.conf') |
|
with open(filename, 'w+') as f: |
|
f.write(text) |
|
|
certpath = os.path.join(self.testdir, 'certs', '%s.pem' % name) |
|
keypath = os.path.join(self.testdir, 'certs', '%s.key' % name) |
|
self.generate_cert(name, addr, certpath, keypath) |
|
|
return filename |
|
|
def generate_cert(self, name, addr, certpath, keypath): |
|
# Generate certs for this setup |
|
cmd = ['openssl', 'req', '-newkey', 'rsa:1024', '-nodes', |
|
'-out', '%s.csr' % certpath, |
|
'-keyout', keypath, |
|
'-subj', '/CN=Ipsilon Test %s' % name] |
|
subprocess.check_call(cmd, |
|
stdout=self.stdout, stderr=self.stderr) |
|
cmd = ['openssl', 'ca', '-batch', '-notext', '-days', '2', |
|
'-md', 'sha1', |
|
'-subj', '/CN=Ipsilon Test %s' % name, |
|
'-outdir', os.path.join(self.testdir, 'certs'), |
|
'-keyfile', os.path.join(self.testdir, 'certs', 'root.key.pem'), |
|
'-cert', os.path.join(self.testdir, 'certs', 'root.cert.pem'), |
|
'-config', os.path.join(self.testdir, 'certs', 'openssl.conf'), |
|
'-in', '%s.csr' % certpath, |
|
'-out', certpath] |
|
ipaddr = addr |
|
if not ipaddr.startswith('127.'): |
|
# Lazy check whether this is a hostname (like in testnameid) |
|
# Note: this IP address might not be correct, but if when the |
|
# hostname is consistently used, that doesn't matter. |
|
# We just set it to a known value to make sure openssl doesn't |
|
# crash. |
|
ipaddr = '127.0.0.10' |
|
subprocess.check_call(cmd, env={'ADDR': addr, 'IPADDR': ipaddr}, |
|
stdout=self.stdout, stderr=self.stderr) |
|
|
def setup_idp_server(self, profile, name, addr, port, env): |
|
http_conf_file = self.setup_http(name, addr, port) |
|
logfile = os.path.join(self.testdir, name, 'logs', 'install.log') |
|
if env: |
|
env['LOGFILE'] = logfile |
|
else: |
|
env = {'LOGFILE': logfile} |
|
cmd = [os.path.join(self.rootdir, |
|
'ipsilon/install/ipsilon-server-install'), |
|
'--config-profile=%s' % profile] |
|
subprocess.check_call(cmd, env=env, |
|
stdout=self.stdout, stderr=self.stderr) |
|
os.symlink(os.path.join(self.rootdir, 'ipsilon'), |
|
os.path.join(self.testdir, 'lib', name, 'ipsilon')) |
|
|
return http_conf_file |
|
|
def setup_sp_server(self, profile, name, addr, port, env): |
|
http_conf_file = self.setup_http(name, addr, port) |
|
cmd = [os.path.join(self.rootdir, |
|
'ipsilon/install/ipsilon-client-install'), |
|
'--config-profile=%s' % profile] |
|
subprocess.check_call(cmd, env=env, |
|
stdout=self.stdout, stderr=self.stderr) |
|
|
return http_conf_file |
|
|
def setup_pgdb(self, datadir, env): |
|
cmd = ['/usr/bin/pg_ctl', 'initdb', '-D', datadir, '-o', '-E UNICODE'] |
|
subprocess.check_call(cmd, env=env, |
|
stdout=self.stdout, stderr=self.stderr) |
|
auth = 'host all all 127.0.0.1/24 trust\n' |
|
filename = os.path.join(datadir, 'pg_hba.conf') |
|
with open(filename, 'a') as f: |
|
f.write(auth) |
|
|
def start_etcd_server(self, datadir, addr, clientport, srvport, env): |
|
env['ETCD_NAME'] = 'ipsilon' |
|
env['ETCD_DATA_DIR'] = datadir |
|
env['ETCD_LISTEN_CLIENT_URLS'] = 'http://%s:%s' % (addr, clientport) |
|
env['ETCD_LISTEN_PEER_URLS'] = 'http://%s:%s' % (addr, srvport) |
|
env['ETCD_FORCE_NEW_CLUSTER'] = 'true' |
|
env['ETCD_INITIAL_CLUSTER'] = 'ipsilon=http://%s:%s' % (addr, srvport) |
|
env['ETCD_ADVERTISE_CLIENT_URLS'] = 'http://%s:%s' % (addr, clientport) |
|
env['ETCD_INITIAL_ADVERTISE_PEER_URLS'] = 'http://%s:%s' % (addr, |
|
srvport) |
|
p = subprocess.Popen(['/usr/bin/etcd'], |
|
env=env, preexec_fn=os.setsid, |
|
stdout=self.stdout, stderr=self.stderr) |
|
self.processes.append(p) |
|
return p |
|
|
def start_http_server(self, conf, env): |
|
env['MALLOC_CHECK_'] = '3' |
|
env['MALLOC_PERTURB_'] = str(random.randint(0, 32767) % 255 + 1) |
|
env['REQUESTS_CA_BUNDLE'] = os.path.join(self.testdir, 'certs', |
|
'root.cert.pem') |
|
p = subprocess.Popen(['/usr/sbin/httpd', '-DFOREGROUND', '-f', conf], |
|
env=env, preexec_fn=os.setsid, |
|
stdout=self.stdout, stderr=self.stderr) |
|
self.processes.append(p) |
|
return p |
|
|
def start_pgdb_server(self, datadir, rundir, log, addr, port, env): |
|
p = subprocess.Popen(['/usr/bin/pg_ctl', 'start', '-D', datadir, '-o', |
|
'-k %s -c port=%s -c \ |
|
listen_addresses=%s' % (rundir, port, addr), |
|
'-l', log, '-w'], |
|
env=env, preexec_fn=os.setsid, |
|
stdout=self.stdout, stderr=self.stderr) |
|
self.processes.append(p) |
|
p.wait() |
|
for d in ['adminconfig', 'users', 'transactions', 'sessions', |
|
'saml2.sessions.db']: |
|
cmd = ['/usr/bin/createdb', '-h', addr, '-p', port, d] |
|
subprocess.check_call(cmd, env=env, |
|
stdout=self.stdout, stderr=self.stderr) |
|
|
def setup_ldap(self, env): |
|
ldapdir = os.path.join(self.testdir, 'ldap') |
|
os.mkdir(ldapdir) |
|
with open(os.path.join(self.rootdir, 'tests/slapd.conf')) as f: |
|
t = Template(f.read()) |
|
text = t.substitute({'ldapdir': ldapdir}) |
|
filename = os.path.join(ldapdir, 'slapd.conf') |
|
with open(filename, 'w+') as f: |
|
f.write(text) |
|
subprocess.check_call(['/usr/sbin/slapadd', '-f', filename, '-l', |
|
'tests/ldapdata.ldif'], env=env, |
|
stdout=self.stdout, stderr=self.stderr) |
|
|
return filename |
|
|
def start_ldap_server(self, conf, addr, port, env): |
|
p = subprocess.Popen(['/usr/sbin/slapd', '-d', '0', '-f', conf, |
|
'-h', 'ldap://%s:%s' % (addr, port)], |
|
env=env, preexec_fn=os.setsid, |
|
stdout=self.stdout, stderr=self.stderr) |
|
self.processes.append(p) |
|
|
def setup_kdc(self, env): |
|
|
# setup kerberos environment |
|
testlog = os.path.join(self.testdir, 'kerb.log') |
|
krb5conf = os.path.join(self.testdir, 'krb5.conf') |
|
kdcconf = os.path.join(self.testdir, 'kdc.conf') |
|
kdcdir = os.path.join(self.testdir, 'kdc') |
|
if os.path.exists(kdcdir): |
|
shutil.rmtree(kdcdir) |
|
os.makedirs(kdcdir) |
|
|
t = Template(KRB5_CONF_TEMPLATE) |
|
text = t.substitute({'TESTREALM': TESTREALM, |
|
'TESTDOMAIN': TESTDOMAIN, |
|
'TESTDIR': self.testdir, |
|
'KDCDIR': kdcdir, |
|
'KDC_DBNAME': KDC_DBNAME, |
|
'WRAP_HOSTNAME': WRAP_HOSTNAME}) |
|
with open(krb5conf, 'w+') as f: |
|
f.write(text) |
|
|
t = Template(KDC_CONF_TEMPLATE) |
|
text = t.substitute({'TESTREALM': TESTREALM, |
|
'KDCDIR': kdcdir, |
|
'KDCLOG': testlog, |
|
'KDC_STASH': KDC_STASH}) |
|
with open(kdcconf, 'w+') as f: |
|
f.write(text) |
|
|
kdcenv = {'PATH': '/sbin:/bin:/usr/sbin:/usr/bin', |
|
'KRB5_CONFIG': krb5conf, |
|
'KRB5_KDC_PROFILE': kdcconf} |
|
kdcenv.update(env) |
|
|
with (open(testlog, 'a')) as logfile: |
|
ksetup = subprocess.Popen(["kdb5_util", "create", "-s", |
|
"-r", TESTREALM, "-P", KDC_PASSWORD], |
|
stdout=logfile, stderr=logfile, |
|
env=kdcenv, preexec_fn=os.setsid) |
|
ksetup.wait() |
|
if ksetup.returncode != 0: |
|
raise ValueError('KDC Setup failed') |
|
|
kdcproc = subprocess.Popen(['krb5kdc', '-n'], |
|
env=kdcenv, preexec_fn=os.setsid, |
|
stdout=self.stdout, stderr=self.stderr) |
|
self.processes.append(kdcproc) |
|
|
return kdcenv |
|
|
def kadmin_local(self, cmd, env, logfile): |
|
ksetup = subprocess.Popen(["kadmin.local", "-q", cmd], |
|
stdout=logfile, stderr=logfile, |
|
env=env, preexec_fn=os.setsid) |
|
ksetup.wait() |
|
if ksetup.returncode != 0: |
|
raise ValueError('Kadmin local [%s] failed' % cmd) |
|
|
def setup_keys(self, env): |
|
|
testlog = os.path.join(self.testdir, 'kerb.log') |
|
|
svc_name = "HTTP/%s" % WRAP_HOSTNAME |
|
svc_keytab = os.path.join(self.testdir, HTTP_KTNAME) |
|
cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, svc_name) |
|
with (open(testlog, 'a')) as logfile: |
|
self.kadmin_local(cmd, env, logfile) |
|
cmd = "ktadd -k %s -e %s %s" % (svc_keytab, KEY_TYPE, svc_name) |
|
with (open(testlog, 'a')) as logfile: |
|
self.kadmin_local(cmd, env, logfile) |
|
|
usr_keytab = os.path.join(self.testdir, USER_KTNAME) |
|
cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, self.testuser) |
|
with (open(testlog, 'a')) as logfile: |
|
self.kadmin_local(cmd, env, logfile) |
|
cmd = "ktadd -k %s -e %s %s" % (usr_keytab, KEY_TYPE, self.testuser) |
|
with (open(testlog, 'a')) as logfile: |
|
self.kadmin_local(cmd, env, logfile) |
|
|
keys_env = {"KRB5_KTNAME": svc_keytab} |
|
keys_env.update(env) |
|
|
return keys_env |
|
|
def kinit_keytab(self, kdcenv): |
|
testlog = os.path.join(self.testdir, 'kinit.log') |
|
usr_keytab = os.path.join(self.testdir, USER_KTNAME) |
|
kdcenv['KRB5CCNAME'] = 'FILE:' + os.path.join( |
|
self.testdir, 'ccaches/user') |
|
with (open(testlog, 'a')) as logfile: |
|
logfile.write("\n%s\n" % kdcenv) |
|
ksetup = subprocess.Popen(["kinit", "-kt", usr_keytab, |
|
self.testuser], |
|
stdout=logfile, stderr=logfile, |
|
env=kdcenv, preexec_fn=os.setsid) |
|
ksetup.wait() |
|
if ksetup.returncode != 0: |
|
raise ValueError('kinit %s failed' % self.testuser) |
|
|
def wait(self): |
|
for p in self.processes: |
|
os.killpg(p.pid, signal.SIGTERM) |
|
|
def setup_servers(self, env=None): |
|
raise NotImplementedError() |
|
|
def setup_step(self, message): |
|
"""Method to inform setup step starting.""" |
|
self.current_setup_step = message |
|
|
def run(self, env): |
|
"""Method to run the test process and receive progress reports. |
|
|
The test process is run in a subprocess because it needs to be run with |
|
the socket and nss wrappers, which are used a LD_PRELOAD, which means |
|
the environment must be set before the process starts. |
|
|
The process running run() (Test Control process) communicates with the |
|
Test Process by reading specially formatted strings from standard out. |
|
|
All lines read from the test's stdout will be passed into TC.get_result |
|
to determine whether a test result was provided. |
|
""" |
|
exe = self.execname |
|
if exe.endswith('c'): |
|
exe = exe[:-1] |
|
return self.run_and_collect([exe], env) |
|
|
def run_and_collect(self, cmd, env): |
|
p = subprocess.Popen(cmd, env=env, |
|
stdout=subprocess.PIPE, stderr=self.stderr) |
|
results = [] |
|
for line in p.stdout: |
|
line = line[:-1] # Strip newline |
|
result = TC.get_result(line) |
|
if result: |
|
if self.print_cases: |
|
TC.output(result) |
|
results.append(result) |
|
else: |
|
if self.stdout is None: |
|
print(line) |
|
return p.wait(), results |