#19 add koji-ssl-admin command
Merged 2 years ago by mikem. Opened 2 years ago by ktdreyer.
ktdreyer/koji-tools ssl-admin  into  master

file added
+418
@@ -0,0 +1,418 @@ 

+ #!/usr/bin/python3

+ import argparse

+ from argparse import RawTextHelpFormatter

+ from datetime import datetime

+ from dateutil.relativedelta import relativedelta

+ import errno

+ import os

+ from cryptography.hazmat.backends import default_backend

+ from cryptography.hazmat.primitives import serialization

+ from cryptography.hazmat.primitives.asymmetric import rsa

+ from cryptography import x509

+ from cryptography.x509.oid import NameOID

+ from cryptography.x509.oid import ExtendedKeyUsageOID

+ from cryptography.hazmat.primitives import hashes

+ 

+ 

+ DESCRIPTION = """

+ Common Koji SSL admin operations:

+ 

+ 1) Generate a server key and CSR for HTTPS on kojihub or kojiweb.

+    You will pass this CSR (cert signing request) to a master CA for signing.

+    You will need these even if you use Kerberos authentication.

+    You will also need these regardless if you use an external CA or a testing

+    one that you generate with this tool.

+ 

+ 2) Generate a user account key and CSR.

+    You will pass this CSR (cert signing request) to a master CA for signing.

+    You only need these if you use SSL cert authentication (not Kerberos).

+    These might be usable with an external CA if you have one.

+ 

+ 3) Generate a new master CA.

+    You only need this if you don't have an external CA in your environment

+    that can sign CSRs. (For example, in a testing environment.) Generates new

+    "koji-ca.key" and "koji-ca.crt" files. The CA is valid for ten years from

+    today.

+ 

+ 4) Sign a CSR for a user or server with a master CA.

+    Like "master CA" above, you only need this if you don't have an external CA

+    in your environment. (For example, in a testing environment.)

+    You can use this to sign server certs or user certs with your CA.

+ 

+ Never share the .key files or post them in a public location.

+ """

+ 

+ 

+ def generate_key(path, force=False):

+     """

+     Generate a new private RSA key.

+ 

+     :param str path: path on disk to write the private key PEM file.

+     :returns: An instance of RSAPrivateKey.

+     """

+     if os.path.exists(path) and not force:

+         raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), path)

+     key = rsa.generate_private_key(

+         public_exponent=65537,

+         key_size=2048,

+         backend=default_backend()

+     )

+     private_bytes = key.private_bytes(

+         encoding=serialization.Encoding.PEM,

+         format=serialization.PrivateFormat.TraditionalOpenSSL,

+         encryption_algorithm=serialization.NoEncryption())

+     if os.path.exists(path):

+         # Delete this file in case we are changing its mode

+         os.unlink(path)

+     flags = os.O_WRONLY | os.O_CREAT

+     mode = 0o600

+     with os.fdopen(os.open(path, flags, mode), 'wb') as f:

+         f.write(private_bytes)

+     return key

+ 

+ 

+ def generate_server_csr(key, path, dnsnames, force=False):

+     """

+     Generate a CSR for an HTTPS Koji server (kojihub or kojiweb)

+ 

+     :param key: private key material from rsa.generate_private_key() or

+                 serialization.load_pem_private_key().

+     :type key: RSAPrivateKey

+     :param str path: path on disk to write the CSR PEM file.

+     :param iterable dnsnames: list of DNS hostnames to write in the CSR file

+     :returns: an instance of cryptography.x509.CertificateSigningRequest

+     """

+     if os.path.exists(path) and not force:

+         raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), path)

+     builder = x509.CertificateSigningRequestBuilder()

+     # Build the basic cert with the simple (deprecated) common name:

+     common_name = dnsnames[0]  # "mysite.com"

+     subject_name = x509.Name([

+         x509.NameAttribute(NameOID.COMMON_NAME, common_name),

+     ])

+     builder = builder.subject_name(subject_name)

+     # This is not a CA:

+     builder = builder.add_extension(

+         x509.BasicConstraints(ca=False, path_length=None), critical=True,

+     )

+     # Add Subject Alternative Names for all our dnsnames:

+     subject_alt_names = [x509.DNSName(dnsname) for dnsname in dnsnames]

+     builder = builder.add_extension(

+         x509.SubjectAlternativeName(subject_alt_names), critical=False,

+     )

+     # Server authentication only:

+     builder = builder.add_extension(

+         x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]),

+         critical=False,

+     )

+     # Sign the CSR with our private key:

+     csr = builder.sign(key, hashes.SHA256(), default_backend())

+     # Write our CSR out to disk.

+     public_bytes = csr.public_bytes(serialization.Encoding.PEM)

+     with open(path, 'wb') as f:

+         f.write(public_bytes)

+     return csr

+ 

+ 

+ def generate_user_csr(key, path, username, force=False):

+     """

+     Generate a CSR for a Koji user account

+ 

+     :param key: private key material from rsa.generate_private_key() or

+                 serialization.load_pem_private_key().

+     :type key: RSAPrivateKey

+     :param str path: path on disk to write the CSR PEM file.

+     :param iterable dnsnames: list of DNS hostnames to write in the CSR file

+     :returns: an instance of cryptography.x509.CertificateSigningRequest

+     """

+     if os.path.exists(path) and not force:

+         raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), path)

+     builder = x509.CertificateSigningRequestBuilder()

+     # Build the basic cert with the simple common name:

+     subject_name = x509.Name([

+         x509.NameAttribute(NameOID.COMMON_NAME, username),

+     ])

+     builder = builder.subject_name(subject_name)

+     # This is not a CA:

+     builder = builder.add_extension(

+         x509.BasicConstraints(ca=False, path_length=None), critical=True,

+     )

+     # Client authentication only:

+     builder = builder.add_extension(

+         x509.ExtendedKeyUsage([ExtendedKeyUsageOID.CLIENT_AUTH]),

+         critical=False,

+     )

+     # Sign the CSR with our private key:

+     csr = builder.sign(key, hashes.SHA256(), default_backend())

+     # Write our CSR out to disk.

+     public_bytes = csr.public_bytes(serialization.Encoding.PEM)

+     with open(path, 'wb') as f:

+         f.write(public_bytes)

+     return csr

+ 

+ 

+ def generate_ca(key, path, name, force=False):

+     """

+     Generate a new CA for a Koji use.

+ 

+     :param key: private key material from rsa.generate_private_key() or

+                 serialization.load_pem_private_key().

+     :type key: RSAPrivateKey

+     :param str path: path on disk to write the public crt PEM file.

+     :param str name: string to use as the "Common Name" for this CA.

+                      For example, "Koji CA".

+     :returns: an instance of cryptography.x509.CertificateSigningRequest

+     """

+     if os.path.exists(path) and not force:

+         raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), path)

+     public_key = key.public_key()

+     public_key_id = x509.SubjectKeyIdentifier.from_public_key(public_key)

+     x509_name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, name)])

+     builder = x509.CertificateBuilder()

+     builder = builder.subject_name(x509_name)

+     builder = builder.issuer_name(x509_name)

+     one_day_ago = datetime.today() - relativedelta(days=1)

+     in_ten_years = datetime.today() + relativedelta(years=10)

+     builder = builder.not_valid_before(one_day_ago)

+     builder = builder.not_valid_after(in_ten_years)

+     serial_number = x509.random_serial_number()

+     builder = builder.serial_number(serial_number)

+     builder = builder.public_key(public_key)

+     builder = builder.add_extension(

+         x509.BasicConstraints(ca=True, path_length=0), critical=True,

+     )

+     builder = builder.add_extension(

+         extension=x509.KeyUsage(

+             digital_signature=True,

+             key_encipherment=False,

+             content_commitment=False,

+             data_encipherment=False,

+             key_agreement=False,

+             encipher_only=False,

+             decipher_only=False,

+             key_cert_sign=True,

+             crl_sign=True

+         ),

+         critical=True

+     )

+     builder = builder.add_extension(public_key_id, critical=False)

+     builder = builder.add_extension(

+         x509.AuthorityKeyIdentifier(public_key_id.digest,

+                                     [x509.DirectoryName(x509_name)],

+                                     serial_number),

+         critical=False,

+     )

+     certificate = builder.sign(

+         private_key=key, algorithm=hashes.SHA256(),

+         backend=default_backend()

+     )

+     # Write our .crt out to disk.

+     public_bytes = certificate.public_bytes(serialization.Encoding.PEM)

+     with open(path, 'wb') as f:

+         f.write(public_bytes)

+     return certificate

+ 

+ 

+ def sign_with_ca(csr_path, ca_key_path, ca_crt_path, crt_path, force=False):

+     """

+     Sign a CSR with a CA keypair.

+ 

+     :param str csr_path: path to .csr file

+     :param str ca_key_path: path to CA .key file

+     :param str ca_crt_path: path to CA .crt file

+     :param str crt_path: path on disk to write the public crt PEM file.

+     :returns: an instance of cryptography.x509.Certificate

+     """

+     if os.path.exists(crt_path) and not force:

+         raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), crt_path)

+     with open(csr_path, 'rb') as f:

+         csr = x509.load_pem_x509_csr(f.read(), default_backend())

+     with open(ca_key_path, 'rb') as f:

+         key = serialization.load_pem_private_key(f.read(),

+                                                  password=None,

+                                                  backend=default_backend())

+     with open(ca_crt_path, 'rb') as f:

+         ca_crt = x509.load_pem_x509_certificate(f.read(), default_backend())

+     public_key = key.public_key()

+ 

+     builder = x509.CertificateBuilder()

+     builder = builder.subject_name(csr.subject)

+     builder = builder.issuer_name(ca_crt.subject)

+     builder = builder.public_key(csr.public_key())

+     serial_number = x509.random_serial_number()

+     builder = builder.serial_number(serial_number)

+     in_two_years = datetime.today() + relativedelta(years=2)

+     builder = builder.not_valid_before(datetime.today())

+     builder = builder.not_valid_after(in_two_years)

+     builder = builder.add_extension(

+         extension=x509.KeyUsage(

+             digital_signature=True,

+             key_encipherment=True,

+             content_commitment=True,

+             data_encipherment=True,

+             key_agreement=False,

+             encipher_only=False,

+             decipher_only=False,

+             key_cert_sign=False,

+             crl_sign=False

+         ),

+         critical=True

+     )

+     builder = builder.add_extension(

+         x509.BasicConstraints(ca=False, path_length=None), critical=True,

+     )

+     builder = builder.add_extension(

+         x509.SubjectKeyIdentifier.from_public_key(csr.public_key()),

+         critical=False,

+     )

+     builder = builder.add_extension(

+         x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key),

+         critical=False,

+     )

+     # Require ExtendedKeyUsage from the CSR

+     eku = csr.extensions.get_extension_for_class(x509.ExtendedKeyUsage)

+     builder = builder.add_extension(eku.value, critical=False)

+     # Add any SubjectAlternativeName extension in the CSR:

+     san_cls = x509.SubjectAlternativeName

+     try:

+         san = csr.extensions.get_extension_for_class(san_cls)

+         builder = builder.add_extension(san.value, critical=False)

+     except x509.extensions.ExtensionNotFound:

+         # We expect user client certs to not have a SubjectAlternativeName.

+         client_auth = x509.ExtendedKeyUsage([ExtendedKeyUsageOID.CLIENT_AUTH])

+         if eku.value != client_auth:

+             raise

+     certificate = builder.sign(

+         private_key=key,

+         algorithm=hashes.SHA256(),

+         backend=default_backend()

+     )

+     # Write our .crt out to disk.

+     public_bytes = certificate.public_bytes(serialization.Encoding.PEM)

+     with open(crt_path, 'wb') as f:

+         f.write(public_bytes)

+     return certificate

+ 

+ 

+ def server_csr(args):

+     force = args.force

+     dnsnames = args.dnsnames

+     common_name = dnsnames[0]

+     key_path = '%s.key' % common_name

+     csr_path = '%s.csr' % common_name

+     key = generate_key(key_path, force)

+     print('wrote %s - protect this private file' % key_path)

+     generate_server_csr(key, csr_path, dnsnames, force)

+     print('wrote %s - sign this with a CA' % csr_path)

+ 

+ 

+ def user_csr(args):

+     force = args.force

+     username = args.username

+     key_path = '%s.key' % username

+     csr_path = '%s.csr' % username

+     key = generate_key(key_path, force)

+     print('wrote %s - protect this private file' % key_path)

+     generate_user_csr(key, csr_path, username, force)

+     print('wrote %s - sign this with a CA' % csr_path)

+ 

+ 

+ def new_ca(args):

+     force = args.force

+     common_name = args.common_name

+     key_path = 'koji-ca.key'

+     crt_path = 'koji-ca.crt'

+     key = generate_key(key_path, force)

+     print('wrote %s - protect this private file' % key_path)

+     generate_ca(key, crt_path, common_name, force)

+     print('wrote %s - publish this for users' % crt_path)

+ 

+ 

+ def sign(args):

+     force = args.force

+     ca_key_path = args.ca_key

+     ca_crt_path = args.ca_cert

+     csr_path = args.csr

+     crt_path = csr_path.replace('.csr', '.crt')  # todo: re.replace() here

+     sign_with_ca(csr_path, ca_key_path, ca_crt_path, crt_path, force)

+     print('wrote %s - publish this for users' % crt_path)

+ 

+ 

+ def parse_args():

+     parser = argparse.ArgumentParser(description=DESCRIPTION,

+                                      formatter_class=RawTextHelpFormatter)

+     # top-level subcommands:

+     subparsers = parser.add_subparsers(dest='subcommand')

+     subparsers.required = True

+ 

+     # server-csr sub-command:

+     server_csr_help = ('Generate a new server key and CSR for an HTTPS server'

+                        '(eg. kojihub or kojiweb).')

+     server_csr_parser = subparsers.add_parser('server-csr',

+                                               help=server_csr_help)

+     server_csr_parser.add_argument('--force', action='store_true',

+                                    help='overwrite .key and .csr files if '

+                                         'they exist')

+     dnsnames_help = ('fully-qualified domain name, for example '

+                      '"kojihub.example.com" or "kojiweb.example.com". If you '

+                      'will use more than one DNS name for the same server, '

+                      'list them all here to put them all into one '

+                      'certificate.')

+     server_csr_parser.add_argument('dnsnames', metavar='fqdn',

+                                    nargs='+', help=dnsnames_help)

+     server_csr_parser.set_defaults(func=server_csr)

+ 

+     # user-csr sub-command:

+     user_csr_help = 'Generate a new key and CSR for a koji user account.'

+     user_csr_parser = subparsers.add_parser('user-csr',

+                                             help=user_csr_help)

+     user_csr_parser.add_argument('--force', action='store_true',

+                                  help='overwrite .key and .csr files if they'

+                                       'exist')

+     user_csr_parser.add_argument('username',

+                                  help='koji account name, eg. "kdreyer"')

+     user_csr_parser.set_defaults(func=user_csr)

+ 

+     # new-ca sub-command:

+     new_ca_help = 'Generate a new self-signed SSL CA to sign CSRs.'

+     new_ca_parser = subparsers.add_parser('new-ca', help=new_ca_help)

+     new_ca_parser.add_argument('--force', action='store_true',

+                                help='overwrite koji-ca.key and koji-ca.crt '

+                                     'files if they exist')

+     new_ca_parser.add_argument('--common-name', default='Koji CA',

+                                help='optional common name to use in the '

+                                     'public CA cert. For example, "My '

+                                     'Company Koji CA". Defaults to "Koji CA".')

+     new_ca_parser.set_defaults(func=new_ca)

+ 

+     # sign sub-command:

+     sign_help = 'Sign a CSR with our Koji CA.'

+     sign_parser = subparsers.add_parser('sign', help=sign_help)

+     sign_parser.add_argument('--force', action='store_true',

+                              help='overwrite .crt file if it exists')

+     sign_parser.add_argument('--ca-key', default='koji-ca.key',

+                              help='path to koji-ca.key (default: current '

+                                   'working directory)')

+     sign_parser.add_argument('--ca-cert', default='koji-ca.crt',

+                              help='path to koji-ca.crt (default: current '

+                                   'working directory)')

+     sign_parser.add_argument('csr',

+                              help='csr file to sign, eg. "kdreyer.csr"')

+     sign_parser.set_defaults(func=sign)

+ 

+     return parser.parse_args()

+ 

+ 

+ def main():

+     args = parse_args()

+     try:

+         args.func(args)

+     except OSError as e:

+         if e.errno == errno.EEXIST:

+             print('Try the --force option to overwrite files')

+             raise SystemExit(e)

+         raise

+ 

+ 

+ if __name__ == '__main__':

+     main()

Add a tool that can generate the SSL certificates that Koji needs.

This provides a simple alternative to running openssl by hand.

The current Koji Server Howto guide includes a lot of steps to run openssl by hand, and I find I make mistakes easily in this area.

This tool makes it trivial to generate the required SSL keys, CSRs, and CA to set up a Koji environment. It has opinionated settings, like fixed, safe key sizes so you can get up and running out of the box quickly.

You can use this to create your own Koji-specific CA and sign HTTPS certs and user certs, or you can just generate the CSRs to submit to an official CA later.

This generates the certs with single commands and predictable filenames, so it's easy to wrap this with scripts or config management systems like Ansible.

@lucarval , @puiterwijk FYI, looks like you had scripted something similar at https://github.com/projectatomic/osbs-box/blob/master/generate-certs . You might review this and see if generate-certs could call this tool.

@ktdreyer, thanks! I filed #88 to track looking into it.

Thanks for writing this

Commit 598d267 fixes this pull-request

Pull-Request has been merged by mikem

2 years ago
Metadata