| |
@@ -0,0 +1,418 @@
|
| |
+ #!/usr/bin/python3
|
| |
+ import argparse
|
| |
+ from argparse import RawTextHelpFormatter
|
| |
+ from datetime import datetime
|
| |
+ from dateutil.relativedelta import relativedelta
|
| |
+ import errno
|
| |
+ import os
|
| |
+ from cryptography.hazmat.backends import default_backend
|
| |
+ from cryptography.hazmat.primitives import serialization
|
| |
+ from cryptography.hazmat.primitives.asymmetric import rsa
|
| |
+ from cryptography import x509
|
| |
+ from cryptography.x509.oid import NameOID
|
| |
+ from cryptography.x509.oid import ExtendedKeyUsageOID
|
| |
+ from cryptography.hazmat.primitives import hashes
|
| |
+
|
| |
+
|
| |
+ DESCRIPTION = """
|
| |
+ Common Koji SSL admin operations:
|
| |
+
|
| |
+ 1) Generate a server key and CSR for HTTPS on kojihub or kojiweb.
|
| |
+ You will pass this CSR (cert signing request) to a master CA for signing.
|
| |
+ You will need these even if you use Kerberos authentication.
|
| |
+ You will also need these regardless if you use an external CA or a testing
|
| |
+ one that you generate with this tool.
|
| |
+
|
| |
+ 2) Generate a user account key and CSR.
|
| |
+ You will pass this CSR (cert signing request) to a master CA for signing.
|
| |
+ You only need these if you use SSL cert authentication (not Kerberos).
|
| |
+ These might be usable with an external CA if you have one.
|
| |
+
|
| |
+ 3) Generate a new master CA.
|
| |
+ You only need this if you don't have an external CA in your environment
|
| |
+ that can sign CSRs. (For example, in a testing environment.) Generates new
|
| |
+ "koji-ca.key" and "koji-ca.crt" files. The CA is valid for ten years from
|
| |
+ today.
|
| |
+
|
| |
+ 4) Sign a CSR for a user or server with a master CA.
|
| |
+ Like "master CA" above, you only need this if you don't have an external CA
|
| |
+ in your environment. (For example, in a testing environment.)
|
| |
+ You can use this to sign server certs or user certs with your CA.
|
| |
+
|
| |
+ Never share the .key files or post them in a public location.
|
| |
+ """
|
| |
+
|
| |
+
|
| |
+ def generate_key(path, force=False):
|
| |
+ """
|
| |
+ Generate a new private RSA key.
|
| |
+
|
| |
+ :param str path: path on disk to write the private key PEM file.
|
| |
+ :returns: An instance of RSAPrivateKey.
|
| |
+ """
|
| |
+ if os.path.exists(path) and not force:
|
| |
+ raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), path)
|
| |
+ key = rsa.generate_private_key(
|
| |
+ public_exponent=65537,
|
| |
+ key_size=2048,
|
| |
+ backend=default_backend()
|
| |
+ )
|
| |
+ private_bytes = key.private_bytes(
|
| |
+ encoding=serialization.Encoding.PEM,
|
| |
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
|
| |
+ encryption_algorithm=serialization.NoEncryption())
|
| |
+ if os.path.exists(path):
|
| |
+ # Delete this file in case we are changing its mode
|
| |
+ os.unlink(path)
|
| |
+ flags = os.O_WRONLY | os.O_CREAT
|
| |
+ mode = 0o600
|
| |
+ with os.fdopen(os.open(path, flags, mode), 'wb') as f:
|
| |
+ f.write(private_bytes)
|
| |
+ return key
|
| |
+
|
| |
+
|
| |
+ def generate_server_csr(key, path, dnsnames, force=False):
|
| |
+ """
|
| |
+ Generate a CSR for an HTTPS Koji server (kojihub or kojiweb)
|
| |
+
|
| |
+ :param key: private key material from rsa.generate_private_key() or
|
| |
+ serialization.load_pem_private_key().
|
| |
+ :type key: RSAPrivateKey
|
| |
+ :param str path: path on disk to write the CSR PEM file.
|
| |
+ :param iterable dnsnames: list of DNS hostnames to write in the CSR file
|
| |
+ :returns: an instance of cryptography.x509.CertificateSigningRequest
|
| |
+ """
|
| |
+ if os.path.exists(path) and not force:
|
| |
+ raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), path)
|
| |
+ builder = x509.CertificateSigningRequestBuilder()
|
| |
+ # Build the basic cert with the simple (deprecated) common name:
|
| |
+ common_name = dnsnames[0] # "mysite.com"
|
| |
+ subject_name = x509.Name([
|
| |
+ x509.NameAttribute(NameOID.COMMON_NAME, common_name),
|
| |
+ ])
|
| |
+ builder = builder.subject_name(subject_name)
|
| |
+ # This is not a CA:
|
| |
+ builder = builder.add_extension(
|
| |
+ x509.BasicConstraints(ca=False, path_length=None), critical=True,
|
| |
+ )
|
| |
+ # Add Subject Alternative Names for all our dnsnames:
|
| |
+ subject_alt_names = [x509.DNSName(dnsname) for dnsname in dnsnames]
|
| |
+ builder = builder.add_extension(
|
| |
+ x509.SubjectAlternativeName(subject_alt_names), critical=False,
|
| |
+ )
|
| |
+ # Server authentication only:
|
| |
+ builder = builder.add_extension(
|
| |
+ x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]),
|
| |
+ critical=False,
|
| |
+ )
|
| |
+ # Sign the CSR with our private key:
|
| |
+ csr = builder.sign(key, hashes.SHA256(), default_backend())
|
| |
+ # Write our CSR out to disk.
|
| |
+ public_bytes = csr.public_bytes(serialization.Encoding.PEM)
|
| |
+ with open(path, 'wb') as f:
|
| |
+ f.write(public_bytes)
|
| |
+ return csr
|
| |
+
|
| |
+
|
| |
+ def generate_user_csr(key, path, username, force=False):
|
| |
+ """
|
| |
+ Generate a CSR for a Koji user account
|
| |
+
|
| |
+ :param key: private key material from rsa.generate_private_key() or
|
| |
+ serialization.load_pem_private_key().
|
| |
+ :type key: RSAPrivateKey
|
| |
+ :param str path: path on disk to write the CSR PEM file.
|
| |
+ :param iterable dnsnames: list of DNS hostnames to write in the CSR file
|
| |
+ :returns: an instance of cryptography.x509.CertificateSigningRequest
|
| |
+ """
|
| |
+ if os.path.exists(path) and not force:
|
| |
+ raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), path)
|
| |
+ builder = x509.CertificateSigningRequestBuilder()
|
| |
+ # Build the basic cert with the simple common name:
|
| |
+ subject_name = x509.Name([
|
| |
+ x509.NameAttribute(NameOID.COMMON_NAME, username),
|
| |
+ ])
|
| |
+ builder = builder.subject_name(subject_name)
|
| |
+ # This is not a CA:
|
| |
+ builder = builder.add_extension(
|
| |
+ x509.BasicConstraints(ca=False, path_length=None), critical=True,
|
| |
+ )
|
| |
+ # Client authentication only:
|
| |
+ builder = builder.add_extension(
|
| |
+ x509.ExtendedKeyUsage([ExtendedKeyUsageOID.CLIENT_AUTH]),
|
| |
+ critical=False,
|
| |
+ )
|
| |
+ # Sign the CSR with our private key:
|
| |
+ csr = builder.sign(key, hashes.SHA256(), default_backend())
|
| |
+ # Write our CSR out to disk.
|
| |
+ public_bytes = csr.public_bytes(serialization.Encoding.PEM)
|
| |
+ with open(path, 'wb') as f:
|
| |
+ f.write(public_bytes)
|
| |
+ return csr
|
| |
+
|
| |
+
|
| |
+ def generate_ca(key, path, name, force=False):
|
| |
+ """
|
| |
+ Generate a new CA for a Koji use.
|
| |
+
|
| |
+ :param key: private key material from rsa.generate_private_key() or
|
| |
+ serialization.load_pem_private_key().
|
| |
+ :type key: RSAPrivateKey
|
| |
+ :param str path: path on disk to write the public crt PEM file.
|
| |
+ :param str name: string to use as the "Common Name" for this CA.
|
| |
+ For example, "Koji CA".
|
| |
+ :returns: an instance of cryptography.x509.CertificateSigningRequest
|
| |
+ """
|
| |
+ if os.path.exists(path) and not force:
|
| |
+ raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), path)
|
| |
+ public_key = key.public_key()
|
| |
+ public_key_id = x509.SubjectKeyIdentifier.from_public_key(public_key)
|
| |
+ x509_name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, name)])
|
| |
+ builder = x509.CertificateBuilder()
|
| |
+ builder = builder.subject_name(x509_name)
|
| |
+ builder = builder.issuer_name(x509_name)
|
| |
+ one_day_ago = datetime.today() - relativedelta(days=1)
|
| |
+ in_ten_years = datetime.today() + relativedelta(years=10)
|
| |
+ builder = builder.not_valid_before(one_day_ago)
|
| |
+ builder = builder.not_valid_after(in_ten_years)
|
| |
+ serial_number = x509.random_serial_number()
|
| |
+ builder = builder.serial_number(serial_number)
|
| |
+ builder = builder.public_key(public_key)
|
| |
+ builder = builder.add_extension(
|
| |
+ x509.BasicConstraints(ca=True, path_length=0), critical=True,
|
| |
+ )
|
| |
+ builder = builder.add_extension(
|
| |
+ extension=x509.KeyUsage(
|
| |
+ digital_signature=True,
|
| |
+ key_encipherment=False,
|
| |
+ content_commitment=False,
|
| |
+ data_encipherment=False,
|
| |
+ key_agreement=False,
|
| |
+ encipher_only=False,
|
| |
+ decipher_only=False,
|
| |
+ key_cert_sign=True,
|
| |
+ crl_sign=True
|
| |
+ ),
|
| |
+ critical=True
|
| |
+ )
|
| |
+ builder = builder.add_extension(public_key_id, critical=False)
|
| |
+ builder = builder.add_extension(
|
| |
+ x509.AuthorityKeyIdentifier(public_key_id.digest,
|
| |
+ [x509.DirectoryName(x509_name)],
|
| |
+ serial_number),
|
| |
+ critical=False,
|
| |
+ )
|
| |
+ certificate = builder.sign(
|
| |
+ private_key=key, algorithm=hashes.SHA256(),
|
| |
+ backend=default_backend()
|
| |
+ )
|
| |
+ # Write our .crt out to disk.
|
| |
+ public_bytes = certificate.public_bytes(serialization.Encoding.PEM)
|
| |
+ with open(path, 'wb') as f:
|
| |
+ f.write(public_bytes)
|
| |
+ return certificate
|
| |
+
|
| |
+
|
| |
+ def sign_with_ca(csr_path, ca_key_path, ca_crt_path, crt_path, force=False):
|
| |
+ """
|
| |
+ Sign a CSR with a CA keypair.
|
| |
+
|
| |
+ :param str csr_path: path to .csr file
|
| |
+ :param str ca_key_path: path to CA .key file
|
| |
+ :param str ca_crt_path: path to CA .crt file
|
| |
+ :param str crt_path: path on disk to write the public crt PEM file.
|
| |
+ :returns: an instance of cryptography.x509.Certificate
|
| |
+ """
|
| |
+ if os.path.exists(crt_path) and not force:
|
| |
+ raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), crt_path)
|
| |
+ with open(csr_path, 'rb') as f:
|
| |
+ csr = x509.load_pem_x509_csr(f.read(), default_backend())
|
| |
+ with open(ca_key_path, 'rb') as f:
|
| |
+ key = serialization.load_pem_private_key(f.read(),
|
| |
+ password=None,
|
| |
+ backend=default_backend())
|
| |
+ with open(ca_crt_path, 'rb') as f:
|
| |
+ ca_crt = x509.load_pem_x509_certificate(f.read(), default_backend())
|
| |
+ public_key = key.public_key()
|
| |
+
|
| |
+ builder = x509.CertificateBuilder()
|
| |
+ builder = builder.subject_name(csr.subject)
|
| |
+ builder = builder.issuer_name(ca_crt.subject)
|
| |
+ builder = builder.public_key(csr.public_key())
|
| |
+ serial_number = x509.random_serial_number()
|
| |
+ builder = builder.serial_number(serial_number)
|
| |
+ in_two_years = datetime.today() + relativedelta(years=2)
|
| |
+ builder = builder.not_valid_before(datetime.today())
|
| |
+ builder = builder.not_valid_after(in_two_years)
|
| |
+ builder = builder.add_extension(
|
| |
+ extension=x509.KeyUsage(
|
| |
+ digital_signature=True,
|
| |
+ key_encipherment=True,
|
| |
+ content_commitment=True,
|
| |
+ data_encipherment=True,
|
| |
+ key_agreement=False,
|
| |
+ encipher_only=False,
|
| |
+ decipher_only=False,
|
| |
+ key_cert_sign=False,
|
| |
+ crl_sign=False
|
| |
+ ),
|
| |
+ critical=True
|
| |
+ )
|
| |
+ builder = builder.add_extension(
|
| |
+ x509.BasicConstraints(ca=False, path_length=None), critical=True,
|
| |
+ )
|
| |
+ builder = builder.add_extension(
|
| |
+ x509.SubjectKeyIdentifier.from_public_key(csr.public_key()),
|
| |
+ critical=False,
|
| |
+ )
|
| |
+ builder = builder.add_extension(
|
| |
+ x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key),
|
| |
+ critical=False,
|
| |
+ )
|
| |
+ # Require ExtendedKeyUsage from the CSR
|
| |
+ eku = csr.extensions.get_extension_for_class(x509.ExtendedKeyUsage)
|
| |
+ builder = builder.add_extension(eku.value, critical=False)
|
| |
+ # Add any SubjectAlternativeName extension in the CSR:
|
| |
+ san_cls = x509.SubjectAlternativeName
|
| |
+ try:
|
| |
+ san = csr.extensions.get_extension_for_class(san_cls)
|
| |
+ builder = builder.add_extension(san.value, critical=False)
|
| |
+ except x509.extensions.ExtensionNotFound:
|
| |
+ # We expect user client certs to not have a SubjectAlternativeName.
|
| |
+ client_auth = x509.ExtendedKeyUsage([ExtendedKeyUsageOID.CLIENT_AUTH])
|
| |
+ if eku.value != client_auth:
|
| |
+ raise
|
| |
+ certificate = builder.sign(
|
| |
+ private_key=key,
|
| |
+ algorithm=hashes.SHA256(),
|
| |
+ backend=default_backend()
|
| |
+ )
|
| |
+ # Write our .crt out to disk.
|
| |
+ public_bytes = certificate.public_bytes(serialization.Encoding.PEM)
|
| |
+ with open(crt_path, 'wb') as f:
|
| |
+ f.write(public_bytes)
|
| |
+ return certificate
|
| |
+
|
| |
+
|
| |
+ def server_csr(args):
|
| |
+ force = args.force
|
| |
+ dnsnames = args.dnsnames
|
| |
+ common_name = dnsnames[0]
|
| |
+ key_path = '%s.key' % common_name
|
| |
+ csr_path = '%s.csr' % common_name
|
| |
+ key = generate_key(key_path, force)
|
| |
+ print('wrote %s - protect this private file' % key_path)
|
| |
+ generate_server_csr(key, csr_path, dnsnames, force)
|
| |
+ print('wrote %s - sign this with a CA' % csr_path)
|
| |
+
|
| |
+
|
| |
+ def user_csr(args):
|
| |
+ force = args.force
|
| |
+ username = args.username
|
| |
+ key_path = '%s.key' % username
|
| |
+ csr_path = '%s.csr' % username
|
| |
+ key = generate_key(key_path, force)
|
| |
+ print('wrote %s - protect this private file' % key_path)
|
| |
+ generate_user_csr(key, csr_path, username, force)
|
| |
+ print('wrote %s - sign this with a CA' % csr_path)
|
| |
+
|
| |
+
|
| |
+ def new_ca(args):
|
| |
+ force = args.force
|
| |
+ common_name = args.common_name
|
| |
+ key_path = 'koji-ca.key'
|
| |
+ crt_path = 'koji-ca.crt'
|
| |
+ key = generate_key(key_path, force)
|
| |
+ print('wrote %s - protect this private file' % key_path)
|
| |
+ generate_ca(key, crt_path, common_name, force)
|
| |
+ print('wrote %s - publish this for users' % crt_path)
|
| |
+
|
| |
+
|
| |
+ def sign(args):
|
| |
+ force = args.force
|
| |
+ ca_key_path = args.ca_key
|
| |
+ ca_crt_path = args.ca_cert
|
| |
+ csr_path = args.csr
|
| |
+ crt_path = csr_path.replace('.csr', '.crt') # todo: re.replace() here
|
| |
+ sign_with_ca(csr_path, ca_key_path, ca_crt_path, crt_path, force)
|
| |
+ print('wrote %s - publish this for users' % crt_path)
|
| |
+
|
| |
+
|
| |
+ def parse_args():
|
| |
+ parser = argparse.ArgumentParser(description=DESCRIPTION,
|
| |
+ formatter_class=RawTextHelpFormatter)
|
| |
+ # top-level subcommands:
|
| |
+ subparsers = parser.add_subparsers(dest='subcommand')
|
| |
+ subparsers.required = True
|
| |
+
|
| |
+ # server-csr sub-command:
|
| |
+ server_csr_help = ('Generate a new server key and CSR for an HTTPS server'
|
| |
+ '(eg. kojihub or kojiweb).')
|
| |
+ server_csr_parser = subparsers.add_parser('server-csr',
|
| |
+ help=server_csr_help)
|
| |
+ server_csr_parser.add_argument('--force', action='store_true',
|
| |
+ help='overwrite .key and .csr files if '
|
| |
+ 'they exist')
|
| |
+ dnsnames_help = ('fully-qualified domain name, for example '
|
| |
+ '"kojihub.example.com" or "kojiweb.example.com". If you '
|
| |
+ 'will use more than one DNS name for the same server, '
|
| |
+ 'list them all here to put them all into one '
|
| |
+ 'certificate.')
|
| |
+ server_csr_parser.add_argument('dnsnames', metavar='fqdn',
|
| |
+ nargs='+', help=dnsnames_help)
|
| |
+ server_csr_parser.set_defaults(func=server_csr)
|
| |
+
|
| |
+ # user-csr sub-command:
|
| |
+ user_csr_help = 'Generate a new key and CSR for a koji user account.'
|
| |
+ user_csr_parser = subparsers.add_parser('user-csr',
|
| |
+ help=user_csr_help)
|
| |
+ user_csr_parser.add_argument('--force', action='store_true',
|
| |
+ help='overwrite .key and .csr files if they'
|
| |
+ 'exist')
|
| |
+ user_csr_parser.add_argument('username',
|
| |
+ help='koji account name, eg. "kdreyer"')
|
| |
+ user_csr_parser.set_defaults(func=user_csr)
|
| |
+
|
| |
+ # new-ca sub-command:
|
| |
+ new_ca_help = 'Generate a new self-signed SSL CA to sign CSRs.'
|
| |
+ new_ca_parser = subparsers.add_parser('new-ca', help=new_ca_help)
|
| |
+ new_ca_parser.add_argument('--force', action='store_true',
|
| |
+ help='overwrite koji-ca.key and koji-ca.crt '
|
| |
+ 'files if they exist')
|
| |
+ new_ca_parser.add_argument('--common-name', default='Koji CA',
|
| |
+ help='optional common name to use in the '
|
| |
+ 'public CA cert. For example, "My '
|
| |
+ 'Company Koji CA". Defaults to "Koji CA".')
|
| |
+ new_ca_parser.set_defaults(func=new_ca)
|
| |
+
|
| |
+ # sign sub-command:
|
| |
+ sign_help = 'Sign a CSR with our Koji CA.'
|
| |
+ sign_parser = subparsers.add_parser('sign', help=sign_help)
|
| |
+ sign_parser.add_argument('--force', action='store_true',
|
| |
+ help='overwrite .crt file if it exists')
|
| |
+ sign_parser.add_argument('--ca-key', default='koji-ca.key',
|
| |
+ help='path to koji-ca.key (default: current '
|
| |
+ 'working directory)')
|
| |
+ sign_parser.add_argument('--ca-cert', default='koji-ca.crt',
|
| |
+ help='path to koji-ca.crt (default: current '
|
| |
+ 'working directory)')
|
| |
+ sign_parser.add_argument('csr',
|
| |
+ help='csr file to sign, eg. "kdreyer.csr"')
|
| |
+ sign_parser.set_defaults(func=sign)
|
| |
+
|
| |
+ return parser.parse_args()
|
| |
+
|
| |
+
|
| |
+ def main():
|
| |
+ args = parse_args()
|
| |
+ try:
|
| |
+ args.func(args)
|
| |
+ except OSError as e:
|
| |
+ if e.errno == errno.EEXIST:
|
| |
+ print('Try the --force option to overwrite files')
|
| |
+ raise SystemExit(e)
|
| |
+ raise
|
| |
+
|
| |
+
|
| |
+ if __name__ == '__main__':
|
| |
+ main()
|
| |
Add a tool that can generate the SSL certificates that Koji needs.
This provides a simple alternative to running openssl by hand.