#132 Add 'kerberos_or_ssl' and 'ssl' auth_backend backends.
Merged 6 years ago by jkaluza. Opened 6 years ago by jkaluza.
jkaluza/odcs ssl-auth  into  master

@@ -192,11 +192,11 @@ 

      print("")

  

  if profile == "redhat":

-     compose_id = check_new_compose("tag", "cf-1.0-rhel-5", ["gofer"], ["no_deps"])

-     check_delete_compose(compose_id)

-     check_renew_compose(compose_id, "tag", "cf-1.0-rhel-5", ["gofer"], ["no_deps"])

+ #    compose_id = check_new_compose("tag", "cf-1.0-rhel-5", ["gofer"], ["no_deps"])

+ #    check_delete_compose(compose_id)

+ #    check_renew_compose(compose_id, "tag", "cf-1.0-rhel-5", ["gofer"], ["no_deps"])

  

-     check_new_compose("tag", "cf-1.0-rhel-5", ["gofer"], [])

+ #    check_new_compose("tag", "cf-1.0-rhel-5", ["gofer"], [])

      check_new_compose("pulp", "rhel-7-server-rpms rhel-server-rhscl-7-rpms", [], [])

  else:

      compose_id = check_new_compose("module", "testmodule-master", [], ["no_deps"])

@@ -87,6 +87,41 @@ 

      return user

  

  

+ @commit_on_success

+ def load_ssl_user_from_request(request):

+     """

+     Loads SSL user from current request.

+ 

+     SSL_CLIENT_VERIFY and SSL_CLIENT_S_DN needs to be set in

+     request.environ. This is set by frontend httpd mod_ssl module.

+     """

+     ssl_client_verify = request.environ.get('SSL_CLIENT_VERIFY')

+     if ssl_client_verify != 'SUCCESS':

+         raise Unauthorized('Cannot verify client: %s' % ssl_client_verify)

+ 

+     username = request.environ.get('SSL_CLIENT_S_DN')

+     if not username:

+         raise Unauthorized('Unable to get user information (DN) from client certificate')

+ 

+     user = User.find_user_by_name(username)

+     if not user:

+         user = User.create_user(username=username)

+ 

+     g.groups = []

+     g.user = user

+     return user

+ 

+ 

+ def load_krb_or_ssl_user_from_request(request):

+     """

+     Loads User using Kerberos or SSL auth.

+     """

+     if request.environ.get('REMOTE_USER'):

+         return load_krb_user_from_request(request)

+     else:

+         return load_ssl_user_from_request(request)

+ 

+ 

  def query_ldap_groups(uid):

      client = ldap.initialize(conf.auth_ldap_server)

      groups = client.search_s(conf.auth_ldap_group_base,
@@ -193,6 +228,15 @@ 

      elif backend == 'openidc':

          global load_openidc_user

          load_openidc_user = login_manager.request_loader(load_openidc_user)

+     elif backend == 'kerberos_or_ssl':

+         _validate_kerberos_config()

+         global load_krb_or_ssl_user_from_request

+         load_krb_or_ssl_user_from_request = login_manager.request_loader(

+             load_krb_or_ssl_user_from_request)

+     elif backend == 'ssl':

+         global load_ssl_user_from_request

+         load_ssl_user_from_request = login_manager.request_loader(

+             load_ssl_user_from_request)

      else:

          raise ValueError('Unknown backend name {0}.'.format(backend))

  

file modified
+108
@@ -34,6 +34,8 @@ 

  from odcs.server.auth import load_openidc_user

  from odcs.server.auth import query_ldap_groups

  from odcs.server.auth import require_scopes

+ from odcs.server.auth import load_krb_or_ssl_user_from_request

+ from odcs.server.auth import load_ssl_user_from_request

  from odcs.server.errors import Unauthorized

  from odcs.server.errors import Forbidden

  from odcs.server import app, conf, db
@@ -41,6 +43,104 @@ 

  from .utils import ModelsBaseTest

  

  

+ class TestLoadSSLUserFromRequest(ModelsBaseTest):

+ 

+     def setUp(self):

+         super(TestLoadSSLUserFromRequest, self).setUp()

+ 

+         self.user = User(username='CN=tester1,L=prod,DC=example,DC=com')

+         db.session.add(self.user)

+         db.session.commit()

+ 

+     def test_create_new_user(self):

+         environ_base = {

+             'SSL_CLIENT_VERIFY': 'SUCCESS',

+             'SSL_CLIENT_S_DN': 'CN=client,L=prod,DC=example,DC=com',

+         }

+ 

+         with app.test_request_context(environ_base=environ_base):

+             load_ssl_user_from_request(flask.request)

+ 

+             expected_user = db.session.query(User).filter(

+                 User.username == 'CN=client,L=prod,DC=example,DC=com')[0]

+ 

+             self.assertEqual(expected_user.id, flask.g.user.id)

+             self.assertEqual(expected_user.username, flask.g.user.username)

+ 

+             # Ensure user's groups are set to empty list

+             self.assertEqual(0, len(flask.g.groups))

+ 

+     def test_return_existing_user(self):

+         environ_base = {

+             'SSL_CLIENT_VERIFY': 'SUCCESS',

+             'SSL_CLIENT_S_DN': self.user.username,

+         }

+ 

+         with app.test_request_context(environ_base=environ_base):

+             load_ssl_user_from_request(flask.request)

+ 

+             self.assertEqual(self.user.id, flask.g.user.id)

+             self.assertEqual(self.user.username, flask.g.user.username)

+ 

+             # Ensure user's groups are set to empty list

+             self.assertEqual(0, len(flask.g.groups))

+ 

+     def test_401_if_ssl_client_verify_not_success(self):

+         environ_base = {

+             'SSL_CLIENT_VERIFY': 'GENEROUS',

+             'SSL_CLIENT_S_DN': self.user.username,

+         }

+ 

+         with app.test_request_context(environ_base=environ_base):

+             with self.assertRaises(Unauthorized) as ctx:

+                 load_ssl_user_from_request(flask.request)

+             self.assertTrue('Cannot verify client: GENEROUS' in ctx.exception.args)

+ 

+     def test_401_if_cn_not_set(self):

+         environ_base = {

+             'SSL_CLIENT_VERIFY': 'SUCCESS',

+         }

+ 

+         with app.test_request_context(environ_base=environ_base):

+             with self.assertRaises(Unauthorized) as ctx:

+                 load_ssl_user_from_request(flask.request)

+             self.assertTrue('Unable to get user information (DN) from client certificate' in ctx.exception.args)

+ 

+ 

+ class TestLoadKrbOrSSLUserFromRequest(unittest.TestCase):

+ 

+     @patch("odcs.server.auth.load_ssl_user_from_request")

+     @patch("odcs.server.auth.load_krb_user_from_request")

+     def test_load_krb_or_ssl_user_from_request_remote_user(

+             self, load_krb_user, load_ssl_user):

+         load_krb_user.return_value = "krb_user"

+         load_ssl_user.return_value = "ssl_user"

+ 

+         environ_base = {

+             'REMOTE_USER': 'newuser@EXAMPLE.COM'

+         }

+ 

+         with app.test_request_context(environ_base=environ_base):

+             user = load_krb_or_ssl_user_from_request(flask.request)

+             self.assertEqual(user, "krb_user")

+ 

+     @patch("odcs.server.auth.load_ssl_user_from_request")

+     @patch("odcs.server.auth.load_krb_user_from_request")

+     def test_load_krb_or_ssl_user_from_request_ssl_client(

+             self, load_krb_user, load_ssl_user):

+         load_krb_user.return_value = "krb_user"

+         load_ssl_user.return_value = "ssl_user"

+ 

+         environ_base = {

+             'SSL_CLIENT_VERIFY': 'SUCCESS',

+             'SSL_CLIENT_S_DN': 'ssl_user',

+         }

+ 

+         with app.test_request_context(environ_base=environ_base):

+             user = load_krb_or_ssl_user_from_request(flask.request)

+             self.assertEqual(user, "ssl_user")

+ 

+ 

  class TestLoadKrbUserFromRequest(ModelsBaseTest):

  

      def setUp(self):
@@ -239,6 +339,14 @@ 

          init_auth(self.login_manager, 'openidc')

          self.login_manager.request_loader.assert_called_once_with(load_openidc_user)

  

+     def test_select_ssl_auth_backend(self):

+         init_auth(self.login_manager, 'ssl')

+         self.login_manager.request_loader.assert_called_once_with(load_ssl_user_from_request)

+ 

+     def test_select_kerberos_or_ssl_auth_backend(self):

+         init_auth(self.login_manager, 'kerberos_or_ssl')

+         self.login_manager.request_loader.assert_called_once_with(load_krb_or_ssl_user_from_request)

+ 

      def test_not_use_auth_backend(self):

          init_auth(self.login_manager, 'noauth')

          self.login_manager.request_loader.assert_not_called()

We need to support Kerberos or SSL auth internally in Red Hat because of OSBS. They plan to use ODCS and cannot use Kerberos because of their infrastructure setup. For more information about their plans, see http://osbs.readthedocs.io/en/latest/odcs.html.

This PR adds support for SSL auth using new "kerberos_or_ssl" and "ssl" auth_backends. The SSL auth is planned to be used only internally. In Fedora, OpenIDC should be used.

Patrick reviewed this and found it OK, fyi.

rebased onto 9027e73e79efb6db0d9372a46282c4ad3d680c05

6 years ago

Let's make this more realistic with some cn= and ou= definitions? Just to make sure we don't screw up with some string processing?

rebased onto d1de235

6 years ago

Done, I will merge it.

Pull-Request has been merged by jkaluza

6 years ago