From 22d7492c94837342a559c368454c223f566490ac Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Mar 02 2017 13:45:00 +0000 Subject: Cleanup certdb * use with statement to open/close files * prefer fchmod/fchown when a file descriptor is available * set permission before data is written to file Signed-off-by: Christian Heimes Reviewed-By: Stanislav Laznicka --- diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py index 4f97801..660da79 100644 --- a/ipaserver/install/certs.py +++ b/ipaserver/install/certs.py @@ -117,10 +117,12 @@ class CertDB(object): self.host_name = host_name self.ca_subject = ca_subject self.subject_base = subject_base + try: - self.cwd = os.getcwd() + self.cwd = os.path.abspath(os.getcwd()) except OSError as e: - raise RuntimeError("Unable to determine the current directory: %s" % str(e)) + raise RuntimeError( + "Unable to determine the current directory: %s" % str(e)) self.cacert_name = get_ca_nickname(self.realm) @@ -163,6 +165,8 @@ class CertDB(object): def __del__(self): if self.reqdir is not None: shutil.rmtree(self.reqdir, ignore_errors=True) + self.reqdir = None + self.nssdb.close() try: os.chdir(self.cwd) except OSError: @@ -187,16 +191,16 @@ class CertDB(object): # sure we are in a unique place when this happens os.chdir(self.reqdir) - def set_perms(self, fname, write=False, uid=None): - if uid: - pent = pwd.getpwnam(uid) - os.chown(fname, pent.pw_uid, pent.pw_gid) - else: - os.chown(fname, self.uid, self.gid) + def set_perms(self, fname, write=False): perms = stat.S_IRUSR if write: perms |= stat.S_IWUSR - os.chmod(fname, perms) + if hasattr(fname, 'fileno'): + os.fchown(fname.fileno(), self.uid, self.gid) + os.fchmod(fname.fileno(), perms) + else: + os.chown(fname, self.uid, self.gid) + os.chmod(fname, perms) def run_certutil(self, args, stdin=None, **kwargs): return self.nssdb.run_certutil(args, stdin, **kwargs) @@ -212,19 +216,18 @@ class CertDB(object): def create_noise_file(self): if ipautil.file_exists(self.noise_fname): os.remove(self.noise_fname) - f = open(self.noise_fname, "w") - f.write(ipautil.ipa_generate_password()) - self.set_perms(self.noise_fname) + with open(self.noise_fname, "w") as f: + self.set_perms(f) + f.write(ipautil.ipa_generate_password()) def create_passwd_file(self, passwd=None): ipautil.backup_file(self.passwd_fname) - f = open(self.passwd_fname, "w") - if passwd is not None: - f.write("%s\n" % passwd) - else: - f.write(ipautil.ipa_generate_password()) - f.close() - self.set_perms(self.passwd_fname) + with open(self.passwd_fname, "w") as f: + self.set_perms(f) + if passwd is not None: + f.write("%s\n" % passwd) + else: + f.write(ipautil.ipa_generate_password()) def create_certdbs(self): self.nssdb.create_db(user=self.user, group=self.group, mode=self.mode, @@ -262,13 +265,13 @@ class CertDB(object): # export the CA cert for use with other apps ipautil.backup_file(cacert_fname) root_nicknames = self.find_root_cert(nickname)[:-1] - fd = open(cacert_fname, "w") - for root in root_nicknames: - result = self.run_certutil(["-L", "-n", root, "-a"], - capture_output=True) - fd.write(result.output) - fd.close() - os.chmod(cacert_fname, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) + with open(cacert_fname, "w") as f: + os.fchmod(f.fileno(), stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) + for root in root_nicknames: + result = self.run_certutil(["-L", "-n", root, "-a"], + capture_output=True) + f.write(result.output) + if create_pkcs12: ipautil.backup_file(self.pk12_fname) ipautil.run([paths.PK12UTIL, "-d", self.secdir, @@ -283,9 +286,8 @@ class CertDB(object): Load all the certificates from a given file. It is assumed that this file creates CA certificates. """ - fd = open(cacert_fname) - certs = fd.read() - fd.close() + with open(cacert_fname) as f: + certs = f.read() st = 0 while True: @@ -360,14 +362,14 @@ class CertDB(object): if subject is None: subject=DN(('CN', hostname), self.subject_base) self.request_cert(subject, san_dnsnames=[hostname]) - self.issue_server_cert(self.certreq_fname, self.certder_fname) - self.import_cert(self.certder_fname, nickname) - fd = open(self.certder_fname, "r") - dercert = fd.read() - fd.close() - - os.unlink(self.certreq_fname) - os.unlink(self.certder_fname) + try: + self.issue_server_cert(self.certreq_fname, self.certder_fname) + self.import_cert(self.certder_fname, nickname) + with open(self.certder_fname, "r") as f: + dercert = f.read() + finally: + os.unlink(self.certreq_fname) + os.unlink(self.certder_fname) return dercert @@ -397,10 +399,8 @@ class CertDB(object): if self.host_name is None: raise RuntimeError("CA Host is not set.") - f = open(certreq_fname, "r") - csr = f.readlines() - f.close() - csr = "".join(csr) + with open(certreq_fname, "r") as f: + csr = f.read() # We just want the CSR bits, make sure there is nothing else csr = pkcs10.strip_header(csr) @@ -442,9 +442,8 @@ class CertDB(object): # Write the certificate to a file. It will be imported in a later # step. This file will be read later to be imported. - f = open(cert_fname, "w") - f.write(cert) - f.close() + with open(cert_fname, "w") as f: + f.write(cert) def issue_signing_cert(self, certreq_fname, cert_fname): self.setup_cert_request() @@ -452,10 +451,8 @@ class CertDB(object): if self.host_name is None: raise RuntimeError("CA Host is not set.") - f = open(certreq_fname, "r") - csr = f.readlines() - f.close() - csr = "".join(csr) + with open(certreq_fname, "r") as f: + csr = f.read() # We just want the CSR bits, make sure there is no thing else csr = pkcs10.strip_header(csr) @@ -489,9 +486,8 @@ class CertDB(object): # Write the certificate to a file. It will be imported in a later # step. This file will be read later to be imported. - f = open(cert_fname, "w") - f.write(cert) - f.close() + with open(cert_fname, "w") as f: + f.write(cert) def add_cert(self, cert, nick, flags, pem=False): self.nssdb.add_cert(cert, nick, flags, pem) @@ -514,13 +510,11 @@ class CertDB(object): This is the format of Directory Server pin files. """ ipautil.backup_file(self.pin_fname) - f = open(self.pin_fname, "w") - f.write("Internal (Software) Token:") - pwdfile = open(self.passwd_fname) - f.write(pwdfile.read()) - f.close() - pwdfile.close() - self.set_perms(self.pin_fname) + with open(self.pin_fname, "w") as pinfile: + self.set_perms(pinfile) + pinfile.write("Internal (Software) Token:") + with open(self.passwd_fname) as pwdfile: + pinfile.write(pwdfile.read()) def find_root_cert(self, nickname): """ @@ -563,10 +557,9 @@ class CertDB(object): if ipautil.file_exists(self.certdb_fname): # We already have a cert db, see if it is for the same CA. # If it is we leave things as they are. - f = open(cacert_fname, "r") - newca = f.readlines() - f.close() - newca = "".join(newca) + with open(cacert_fname, "r") as f: + newca = f.read() + newca, _st = find_cert_from_txt(newca) cacert = self.get_cert_from_db(self.cacert_name)