| |
@@ -34,6 +34,8 @@
|
| |
from odcs.server.auth import load_openidc_user
|
| |
from odcs.server.auth import query_ldap_groups
|
| |
from odcs.server.auth import require_scopes
|
| |
+ from odcs.server.auth import load_krb_or_ssl_user_from_request
|
| |
+ from odcs.server.auth import load_ssl_user_from_request
|
| |
from odcs.server.errors import Unauthorized
|
| |
from odcs.server.errors import Forbidden
|
| |
from odcs.server import app, conf, db
|
| |
@@ -41,6 +43,104 @@
|
| |
from .utils import ModelsBaseTest
|
| |
|
| |
|
| |
+ class TestLoadSSLUserFromRequest(ModelsBaseTest):
|
| |
+
|
| |
+ def setUp(self):
|
| |
+ super(TestLoadSSLUserFromRequest, self).setUp()
|
| |
+
|
| |
+ self.user = User(username='CN=tester1,L=prod,DC=example,DC=com')
|
| |
+ db.session.add(self.user)
|
| |
+ db.session.commit()
|
| |
+
|
| |
+ def test_create_new_user(self):
|
| |
+ environ_base = {
|
| |
+ 'SSL_CLIENT_VERIFY': 'SUCCESS',
|
| |
+ 'SSL_CLIENT_S_DN': 'CN=client,L=prod,DC=example,DC=com',
|
| |
+ }
|
| |
+
|
| |
+ with app.test_request_context(environ_base=environ_base):
|
| |
+ load_ssl_user_from_request(flask.request)
|
| |
+
|
| |
+ expected_user = db.session.query(User).filter(
|
| |
+ User.username == 'CN=client,L=prod,DC=example,DC=com')[0]
|
| |
+
|
| |
+ self.assertEqual(expected_user.id, flask.g.user.id)
|
| |
+ self.assertEqual(expected_user.username, flask.g.user.username)
|
| |
+
|
| |
+ # Ensure user's groups are set to empty list
|
| |
+ self.assertEqual(0, len(flask.g.groups))
|
| |
+
|
| |
+ def test_return_existing_user(self):
|
| |
+ environ_base = {
|
| |
+ 'SSL_CLIENT_VERIFY': 'SUCCESS',
|
| |
+ 'SSL_CLIENT_S_DN': self.user.username,
|
| |
+ }
|
| |
+
|
| |
+ with app.test_request_context(environ_base=environ_base):
|
| |
+ load_ssl_user_from_request(flask.request)
|
| |
+
|
| |
+ self.assertEqual(self.user.id, flask.g.user.id)
|
| |
+ self.assertEqual(self.user.username, flask.g.user.username)
|
| |
+
|
| |
+ # Ensure user's groups are set to empty list
|
| |
+ self.assertEqual(0, len(flask.g.groups))
|
| |
+
|
| |
+ def test_401_if_ssl_client_verify_not_success(self):
|
| |
+ environ_base = {
|
| |
+ 'SSL_CLIENT_VERIFY': 'GENEROUS',
|
| |
+ 'SSL_CLIENT_S_DN': self.user.username,
|
| |
+ }
|
| |
+
|
| |
+ with app.test_request_context(environ_base=environ_base):
|
| |
+ with self.assertRaises(Unauthorized) as ctx:
|
| |
+ load_ssl_user_from_request(flask.request)
|
| |
+ self.assertTrue('Cannot verify client: GENEROUS' in ctx.exception.args)
|
| |
+
|
| |
+ def test_401_if_cn_not_set(self):
|
| |
+ environ_base = {
|
| |
+ 'SSL_CLIENT_VERIFY': 'SUCCESS',
|
| |
+ }
|
| |
+
|
| |
+ with app.test_request_context(environ_base=environ_base):
|
| |
+ with self.assertRaises(Unauthorized) as ctx:
|
| |
+ load_ssl_user_from_request(flask.request)
|
| |
+ self.assertTrue('Unable to get user information (DN) from client certificate' in ctx.exception.args)
|
| |
+
|
| |
+
|
| |
+ class TestLoadKrbOrSSLUserFromRequest(unittest.TestCase):
|
| |
+
|
| |
+ @patch("odcs.server.auth.load_ssl_user_from_request")
|
| |
+ @patch("odcs.server.auth.load_krb_user_from_request")
|
| |
+ def test_load_krb_or_ssl_user_from_request_remote_user(
|
| |
+ self, load_krb_user, load_ssl_user):
|
| |
+ load_krb_user.return_value = "krb_user"
|
| |
+ load_ssl_user.return_value = "ssl_user"
|
| |
+
|
| |
+ environ_base = {
|
| |
+ 'REMOTE_USER': 'newuser@EXAMPLE.COM'
|
| |
+ }
|
| |
+
|
| |
+ with app.test_request_context(environ_base=environ_base):
|
| |
+ user = load_krb_or_ssl_user_from_request(flask.request)
|
| |
+ self.assertEqual(user, "krb_user")
|
| |
+
|
| |
+ @patch("odcs.server.auth.load_ssl_user_from_request")
|
| |
+ @patch("odcs.server.auth.load_krb_user_from_request")
|
| |
+ def test_load_krb_or_ssl_user_from_request_ssl_client(
|
| |
+ self, load_krb_user, load_ssl_user):
|
| |
+ load_krb_user.return_value = "krb_user"
|
| |
+ load_ssl_user.return_value = "ssl_user"
|
| |
+
|
| |
+ environ_base = {
|
| |
+ 'SSL_CLIENT_VERIFY': 'SUCCESS',
|
| |
+ 'SSL_CLIENT_S_DN': 'ssl_user',
|
| |
+ }
|
| |
+
|
| |
+ with app.test_request_context(environ_base=environ_base):
|
| |
+ user = load_krb_or_ssl_user_from_request(flask.request)
|
| |
+ self.assertEqual(user, "ssl_user")
|
| |
+
|
| |
+
|
| |
class TestLoadKrbUserFromRequest(ModelsBaseTest):
|
| |
|
| |
def setUp(self):
|
| |
@@ -239,6 +339,14 @@
|
| |
init_auth(self.login_manager, 'openidc')
|
| |
self.login_manager.request_loader.assert_called_once_with(load_openidc_user)
|
| |
|
| |
+ def test_select_ssl_auth_backend(self):
|
| |
+ init_auth(self.login_manager, 'ssl')
|
| |
+ self.login_manager.request_loader.assert_called_once_with(load_ssl_user_from_request)
|
| |
+
|
| |
+ def test_select_kerberos_or_ssl_auth_backend(self):
|
| |
+ init_auth(self.login_manager, 'kerberos_or_ssl')
|
| |
+ self.login_manager.request_loader.assert_called_once_with(load_krb_or_ssl_user_from_request)
|
| |
+
|
| |
def test_not_use_auth_backend(self):
|
| |
init_auth(self.login_manager, 'noauth')
|
| |
self.login_manager.request_loader.assert_not_called()
|
| |
We need to support Kerberos or SSL auth internally in Red Hat because of OSBS. They plan to use ODCS and cannot use Kerberos because of their infrastructure setup. For more information about their plans, see http://osbs.readthedocs.io/en/latest/odcs.html.
This PR adds support for SSL auth using new "kerberos_or_ssl" and "ssl" auth_backends. The SSL auth is planned to be used only internally. In Fedora, OpenIDC should be used.