| |
@@ -19,16 +19,21 @@
|
| |
# SOFTWARE.
|
| |
#
|
| |
# Written by Ralph Bean <rbean@redhat.com>
|
| |
+ # Written by Matt Prahl <mprahl@redhat.com>
|
| |
+ from os import path, environ
|
| |
|
| |
from nose.tools import eq_
|
| |
-
|
| |
import unittest
|
| |
import mock
|
| |
- from mock import patch
|
| |
+ from mock import patch, PropertyMock, Mock
|
| |
+ import kerberos
|
| |
+ import ldap3
|
| |
+ from flask import Response
|
| |
+ from werkzeug.exceptions import Unauthorized as FlaskUnauthorized
|
| |
|
| |
import module_build_service.auth
|
| |
import module_build_service.errors
|
| |
- from os import path
|
| |
+ import module_build_service.config as mbs_config
|
| |
|
| |
|
| |
class TestAuthModule(unittest.TestCase):
|
| |
@@ -100,12 +105,12 @@
|
| |
eq_(username, name)
|
| |
eq_(groups, set(get_user_info.return_value["groups"]))
|
| |
|
| |
- def test_disable_authentication(self):
|
| |
- with patch.dict('module_build_service.app.config', {'NO_AUTH': True}, clear=True):
|
| |
- request = mock.MagicMock()
|
| |
- username, groups = module_build_service.auth.get_user(request)
|
| |
- eq_(username, "anonymous")
|
| |
- eq_(groups, {"packager"})
|
| |
+ @patch.object(mbs_config.Config, 'no_auth', new_callable=PropertyMock, return_value=True)
|
| |
+ def test_disable_authentication(self, conf_no_auth):
|
| |
+ request = mock.MagicMock()
|
| |
+ username, groups = module_build_service.auth.get_user(request)
|
| |
+ eq_(username, "anonymous")
|
| |
+ eq_(groups, {"packager"})
|
| |
|
| |
@patch('module_build_service.auth.client_secrets', None)
|
| |
def test_misconfiguring_oidc_client_secrets_should_be_failed(self):
|
| |
@@ -173,3 +178,191 @@
|
| |
|
| |
self.assertEquals(str(cm.exception),
|
| |
"OIDC_REQUIRED_SCOPE must be set in server config.")
|
| |
+
|
| |
+
|
| |
+ class KerberosMockConfig(object):
|
| |
+ def __init__(self, uri='ldaps://test.example.local:636', dn='ou=groups,dc=domain,dc=local',
|
| |
+ kt='/path/to/keytab', host='mbs.domain.local'):
|
| |
+ """
|
| |
+ :param uri: a string overriding config.ldap_uri
|
| |
+ :param dn: a string overriding config.ldap_groups_dn
|
| |
+ :param kt: a string overriding config.kerberos_keytab
|
| |
+ :param host: a string overriding config.kerberos_http_host
|
| |
+ """
|
| |
+ self.uri = uri
|
| |
+ self.dn = dn
|
| |
+ self.kt = kt
|
| |
+ self.host = host
|
| |
+
|
| |
+ def __enter__(self):
|
| |
+ self.auth_method_p = patch.object(
|
| |
+ mbs_config.Config, 'auth_method', new_callable=PropertyMock)
|
| |
+ mocked_auth_method = self.auth_method_p.start()
|
| |
+ mocked_auth_method.return_value = 'kerberos'
|
| |
+
|
| |
+ self.ldap_uri_p = patch.object(
|
| |
+ mbs_config.Config, 'ldap_uri', new_callable=PropertyMock)
|
| |
+ mocked_ldap_uri = self.ldap_uri_p.start()
|
| |
+ mocked_ldap_uri.return_value = self.uri
|
| |
+
|
| |
+ self.ldap_dn_p = patch.object(
|
| |
+ mbs_config.Config, 'ldap_groups_dn', new_callable=PropertyMock)
|
| |
+ mocked_ldap_dn = self.ldap_dn_p.start()
|
| |
+ mocked_ldap_dn.return_value = self.dn
|
| |
+
|
| |
+ self.kerberos_keytab_p = patch.object(
|
| |
+ mbs_config.Config, 'kerberos_keytab', new_callable=PropertyMock)
|
| |
+ mocked_kerberos_keytab = self.kerberos_keytab_p.start()
|
| |
+ mocked_kerberos_keytab.return_value = self.kt
|
| |
+
|
| |
+ self.kerberos_http_host_p = patch.object(
|
| |
+ mbs_config.Config, 'kerberos_http_host', new_callable=PropertyMock)
|
| |
+ mocked_kerberos_http_host = self.kerberos_http_host_p.start()
|
| |
+ mocked_kerberos_http_host.return_value = self.host
|
| |
+
|
| |
+ def __exit__(self, *args):
|
| |
+ self.auth_method_p.stop()
|
| |
+ self.ldap_uri_p.stop()
|
| |
+ self.ldap_dn_p.stop()
|
| |
+ self.kerberos_keytab_p.stop()
|
| |
+ self.kerberos_http_host_p.stop()
|
| |
+
|
| |
+
|
| |
+ class TestAuthModuleKerberos(unittest.TestCase):
|
| |
+ @patch('kerberos.authGSSServerInit', return_value=(kerberos.AUTH_GSS_COMPLETE, object()))
|
| |
+ @patch('kerberos.authGSSServerStep', return_value=kerberos.AUTH_GSS_COMPLETE)
|
| |
+ @patch('kerberos.authGSSServerResponse', return_value='STOKEN')
|
| |
+ @patch('kerberos.authGSSServerUserName', return_value='mprahl@EXAMPLE.ORG')
|
| |
+ @patch('kerberos.authGSSServerClean')
|
| |
+ @patch('kerberos.getServerPrincipalDetails')
|
| |
+ @patch.dict('os.environ')
|
| |
+ @patch('module_build_service.auth.stack')
|
| |
+ def test_get_user_kerberos(self, stack, principal, clean, name, response,
|
| |
+ step, init):
|
| |
+ """
|
| |
+ Test that authentication works with Kerberos and LDAP
|
| |
+ """
|
| |
+ mock_top = Mock()
|
| |
+ stack.return_value = mock_top
|
| |
+
|
| |
+ headers = {'Authorization': 'foobar'}
|
| |
+ request = mock.MagicMock()
|
| |
+ request.headers.return_value = mock.MagicMock(spec_set=dict)
|
| |
+ request.headers.__getitem__.side_effect = headers.__getitem__
|
| |
+ request.headers.__setitem__.side_effect = headers.__setitem__
|
| |
+ request.headers.__contains__.side_effect = headers.__contains__
|
| |
+
|
| |
+ # Create the mock LDAP instance
|
| |
+ server = ldap3.Server('ldaps://test.domain.local')
|
| |
+ connection = ldap3.Connection(server, client_strategy=ldap3.MOCK_SYNC)
|
| |
+ base_dn = 'dc=domain,dc=local'
|
| |
+ factory_group_attrs = {
|
| |
+ 'objectClass': ['top', 'posixGroup'],
|
| |
+ 'memberUid': ['mprahl', 'tbrady'],
|
| |
+ 'gidNumber': 1234,
|
| |
+ 'cn': ['factory2-devs']
|
| |
+ }
|
| |
+ devs_group_attrs = {
|
| |
+ 'objectClass': ['top', 'posixGroup'],
|
| |
+ 'memberUid': ['mprahl', 'mikeb'],
|
| |
+ 'gidNumber': 1235,
|
| |
+ 'cn': ['devs']
|
| |
+ }
|
| |
+ athletes_group_attrs = {
|
| |
+ 'objectClass': ['top', 'posixGroup'],
|
| |
+ 'memberUid': ['tbrady', 'rgronkowski'],
|
| |
+ 'gidNumber': 1236,
|
| |
+ 'cn': ['athletes']
|
| |
+ }
|
| |
+ mprahl_attrs = {
|
| |
+ 'memberOf': ['cn=Employee,ou=groups,{0}'.format(base_dn)],
|
| |
+ 'uid': ['mprahl'],
|
| |
+ 'cn': ['mprahl'],
|
| |
+ 'objectClass': ['top', 'person']
|
| |
+ }
|
| |
+ connection.strategy.add_entry('cn=factory2-devs,ou=groups,{0}'.format(base_dn),
|
| |
+ factory_group_attrs)
|
| |
+ connection.strategy.add_entry('cn=athletes,ou=groups,{0}'.format(base_dn),
|
| |
+ athletes_group_attrs)
|
| |
+ connection.strategy.add_entry('cn=devs,ou=groups,{0}'.format(base_dn), devs_group_attrs)
|
| |
+ connection.strategy.add_entry('cn=mprahl,ou=users,{0}'.format(base_dn), mprahl_attrs)
|
| |
+
|
| |
+ groups = {'devs', 'factory2-devs'}
|
| |
+ with patch('ldap3.Connection') as mock_ldap_con, KerberosMockConfig():
|
| |
+ mock_ldap_con.return_value = connection
|
| |
+ assert module_build_service.auth.get_user_kerberos(request) == ('mprahl', groups)
|
| |
+
|
| |
+ def test_auth_header_not_set(self):
|
| |
+ """
|
| |
+ Test that an Unauthorized exception is returned when there is no authorization header
|
| |
+ set.
|
| |
+ """
|
| |
+ headers = {}
|
| |
+ request = mock.MagicMock()
|
| |
+ request.headers.return_value = mock.MagicMock(spec_set=dict)
|
| |
+ request.headers.__getitem__.side_effect = headers.__getitem__
|
| |
+ request.headers.__setitem__.side_effect = headers.__setitem__
|
| |
+ request.headers.__contains__.side_effect = headers.__contains__
|
| |
+
|
| |
+ with KerberosMockConfig():
|
| |
+ try:
|
| |
+ module_build_service.auth.get_user_kerberos(request)
|
| |
+ assert False, 'Unauthorized error not raised'
|
| |
+ except FlaskUnauthorized as error:
|
| |
+ assert error.response.www_authenticate.to_header().strip() == 'Negotiate'
|
| |
+ assert error.response.status == '401 UNAUTHORIZED'
|
| |
+
|
| |
+ @patch.dict(environ)
|
| |
+ def test_keytab_not_set(self):
|
| |
+ """
|
| |
+ Test that authentication fails when the keytab is not set
|
| |
+ """
|
| |
+ if 'KRB5_KTNAME' in environ:
|
| |
+ del environ['KRB5_KTNAME']
|
| |
+
|
| |
+ headers = {'Authorization': 'foobar'}
|
| |
+ request = mock.MagicMock()
|
| |
+ request.headers.return_value = mock.MagicMock(spec_set=dict)
|
| |
+ request.headers.__getitem__.side_effect = headers.__getitem__
|
| |
+ request.headers.__setitem__.side_effect = headers.__setitem__
|
| |
+ request.headers.__contains__.side_effect = headers.__contains__
|
| |
+
|
| |
+ with KerberosMockConfig(kt=''):
|
| |
+ try:
|
| |
+ module_build_service.auth.get_user_kerberos(request)
|
| |
+ assert False, 'Unauthorized error not raised'
|
| |
+ except module_build_service.errors.Unauthorized as error:
|
| |
+ assert str(error) == ('Kerberos: set the config value of "KERBEROS_KEYTAB" '
|
| |
+ 'or the environment variable "KRB5_KTNAME" to your '
|
| |
+ 'keytab file')
|
| |
+
|
| |
+ # Set the return value to something not 0 (continue) or 1 (complete)
|
| |
+ @patch('kerberos.authGSSServerInit', return_value=(100, object()))
|
| |
+ @patch('kerberos.authGSSServerStep', return_value=kerberos.AUTH_GSS_COMPLETE)
|
| |
+ @patch('kerberos.authGSSServerResponse', return_value='STOKEN')
|
| |
+ @patch('kerberos.authGSSServerUserName', return_value='mprahl@EXAMPLE.ORG')
|
| |
+ @patch('kerberos.authGSSServerClean')
|
| |
+ @patch('kerberos.getServerPrincipalDetails')
|
| |
+ @patch.dict('os.environ')
|
| |
+ @patch('module_build_service.auth.stack')
|
| |
+ def test_get_user_kerberos_invalid_ticket(self, stack, principal, clean, name, response,
|
| |
+ step, init):
|
| |
+ """
|
| |
+ Test that authentication fails with an invalid Kerberos ticket
|
| |
+ """
|
| |
+ mock_top = Mock()
|
| |
+ stack.return_value = mock_top
|
| |
+
|
| |
+ headers = {'Authorization': 'foobar'}
|
| |
+ request = mock.MagicMock()
|
| |
+ request.headers.return_value = mock.MagicMock(spec_set=dict)
|
| |
+ request.headers.__getitem__.side_effect = headers.__getitem__
|
| |
+ request.headers.__setitem__.side_effect = headers.__setitem__
|
| |
+ request.headers.__contains__.side_effect = headers.__contains__
|
| |
+
|
| |
+ with KerberosMockConfig():
|
| |
+ try:
|
| |
+ module_build_service.auth.get_user_kerberos(request)
|
| |
+ assert False, 'Forbidden error not raised'
|
| |
+ except module_build_service.errors.Forbidden as error:
|
| |
+ assert str(error) == ('Invalid Kerberos ticket')
|
| |