From 9fd3731ff7fb4e383e2f2787d8b6ab1c5904247a Mon Sep 17 00:00:00 2001 From: Chenxiong Qi Date: Oct 17 2019 02:20:59 +0000 Subject: Remove KerberosAuthenticate and get kerberos username from REMOTE_USER Signed-off-by: Chenxiong Qi --- diff --git a/module_build_service/auth.py b/module_build_service/auth.py index e28c564..f5f432d 100644 --- a/module_build_service/auth.py +++ b/module_build_service/auth.py @@ -2,21 +2,11 @@ # SPDX-License-Identifier: MIT """Auth system based on the client certificate and FAS account""" import json -import os -from socket import gethostname import ssl import requests -import kerberos -from flask import Response, g +from flask import g -# Starting with Flask 0.9, the _app_ctx_stack is the correct one, -# before that we need to use the _request_ctx_stack. -try: - from flask import _app_ctx_stack as stack -except ImportError: - from flask import _request_ctx_stack as stack -from werkzeug.exceptions import Unauthorized as FlaskUnauthorized from dogpile.cache import make_region from module_build_service.errors import Unauthorized, Forbidden @@ -141,105 +131,20 @@ def get_user_oidc(request): return username, groups -# Insired by https://pagure.io/waiverdb/blob/master/f/waiverdb/auth.py which was -# inspired by https://github.com/mkomitee/flask-kerberos/blob/master/flask_kerberos.py -class KerberosAuthenticate(object): - def __init__(self): - if conf.kerberos_http_host: - hostname = conf.kerberos_http_host - else: - hostname = gethostname() - self.service_name = "HTTP@{0}".format(hostname) - - # If the config specifies a keytab to use, then override the KRB5_KTNAME - # environment variable - if conf.kerberos_keytab: - os.environ["KRB5_KTNAME"] = conf.kerberos_keytab - - if "KRB5_KTNAME" in os.environ: - try: - principal = kerberos.getServerPrincipalDetails("HTTP", hostname) - except kerberos.KrbError as error: - raise Unauthorized('Kerberos: authentication failed with "{0}"'.format(str(error))) - - log.debug('Kerberos: server is identifying as "{0}"'.format(principal)) - else: - raise Unauthorized( - 'Kerberos: set the config value of "KERBEROS_KEYTAB" or the ' - 'environment variable "KRB5_KTNAME" to your keytab file' - ) - - def _gssapi_authenticate(self, token): - """ - Performs GSSAPI Negotiate Authentication - On success also stashes the server response token for mutual authentication - at the top of request context with the name kerberos_token, along with the - authenticated user principal with the name kerberos_user. - """ - state = None - ctx = stack.top - try: - rc, state = kerberos.authGSSServerInit(self.service_name) - if rc != kerberos.AUTH_GSS_COMPLETE: - log.error("Kerberos: unable to initialize server context") - return None - - rc = kerberos.authGSSServerStep(state, token) - if rc == kerberos.AUTH_GSS_COMPLETE: - log.debug("Kerberos: completed GSSAPI negotiation") - ctx.kerberos_token = kerberos.authGSSServerResponse(state) - ctx.kerberos_user = kerberos.authGSSServerUserName(state) - return rc - elif rc == kerberos.AUTH_GSS_CONTINUE: - log.debug("Kerberos: continuing GSSAPI negotiation") - return kerberos.AUTH_GSS_CONTINUE - else: - log.debug("Kerberos: unable to step server context") - return None - except kerberos.GSSError as error: - log.error("Kerberos: unable to authenticate: {0}".format(str(error))) - return None - finally: - if state: - kerberos.authGSSServerClean(state) - - def process_request(self, token): - """ - Authenticates the current request using Kerberos. - """ - kerberos_user = None - kerberos_token = None - ctx = stack.top - rc = self._gssapi_authenticate(token) - if rc == kerberos.AUTH_GSS_COMPLETE: - kerberos_user = ctx.kerberos_user - kerberos_token = ctx.kerberos_token - elif rc != kerberos.AUTH_GSS_CONTINUE: - raise Forbidden("Invalid Kerberos ticket") - - return kerberos_user, kerberos_token - - def get_user_kerberos(request): - user = None - if "Authorization" not in request.headers: - response = Response("Unauthorized", 401, {"WWW-Authenticate": "Negotiate"}) - exc = FlaskUnauthorized() - # For some reason, certain versions of werkzeug raise an exception when passing `response` - # in the constructor. This is a work-around. - exc.response = response - raise exc - header = request.headers.get("Authorization") - token = "".join(header.strip().split()[1:]) - user, kerberos_token = KerberosAuthenticate().process_request(token) + remote_user = request.environ.get("REMOTE_USER") + if not remote_user: + raise Unauthorized("REMOTE_USER is not properly set in the request.") + # Remove the realm - user = user.split("@")[0] + username, _ = remote_user.split("@") + # If the user is part of the whitelist, then the group membership check is skipped - if user in conf.allowed_users: + if username in conf.allowed_users: groups = [] else: - groups = get_ldap_group_membership(user) - return user, set(groups) + groups = get_ldap_group_membership(username) + return username, set(groups) @region.cache_on_arguments() diff --git a/requirements.txt b/requirements.txt index 1fc783c..b20389f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,6 @@ dogpile.cache fedmsg funcsigs # Python2 only futures # Python 2 only -kerberos kobo>=0.5.0 koji ldap3 diff --git a/tests/test_auth.py b/tests/test_auth.py index 42ccad9..af4d2b6 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,14 +1,11 @@ # -*- coding: utf-8 -*- # SPDX-License-Identifier: MIT -from os import path, environ +from os import path import pytest import requests import mock from mock import patch, PropertyMock, Mock -import kerberos -import ldap3 -from werkzeug.exceptions import Unauthorized as FlaskUnauthorized import module_build_service.auth import module_build_service.errors @@ -220,209 +217,29 @@ class TestAuthModule: module_build_service.auth.get_user(request) assert str(cm.value) == "OIDC_REQUIRED_SCOPE must be set in server config." + @pytest.mark.parametrize("remote_username", ["", None]) + def test_get_user_kerberos_unauthorized(self, remote_username): + request = Mock() + request.environ.get.return_value = remote_username -class KerberosMockConfig(object): - def __init__( - self, - uri="ldaps://test.example.local:636", - dn="ou=groups,dc=domain,dc=local", - kt="/path/to/keytab", - host="mbs.domain.local", - ): - """ - :param uri: a string overriding config.ldap_uri - :param dn: a string overriding config.ldap_groups_dn - :param kt: a string overriding config.kerberos_keytab - :param host: a string overriding config.kerberos_http_host - """ - self.uri = uri - self.dn = dn - self.kt = kt - self.host = host - - def __enter__(self): - self.auth_method_p = patch.object( - mbs_config.Config, "auth_method", new_callable=PropertyMock) - mocked_auth_method = self.auth_method_p.start() - mocked_auth_method.return_value = "kerberos" - - self.ldap_uri_p = patch.object(mbs_config.Config, "ldap_uri", new_callable=PropertyMock) - mocked_ldap_uri = self.ldap_uri_p.start() - mocked_ldap_uri.return_value = self.uri - - self.ldap_dn_p = patch.object( - mbs_config.Config, "ldap_groups_dn", new_callable=PropertyMock) - mocked_ldap_dn = self.ldap_dn_p.start() - mocked_ldap_dn.return_value = self.dn - - self.kerberos_keytab_p = patch.object( - mbs_config.Config, "kerberos_keytab", new_callable=PropertyMock) - mocked_kerberos_keytab = self.kerberos_keytab_p.start() - mocked_kerberos_keytab.return_value = self.kt - - self.kerberos_http_host_p = patch.object( - mbs_config.Config, "kerberos_http_host", new_callable=PropertyMock) - mocked_kerberos_http_host = self.kerberos_http_host_p.start() - mocked_kerberos_http_host.return_value = self.host - - def __exit__(self, *args): - self.auth_method_p.stop() - self.ldap_uri_p.stop() - self.ldap_dn_p.stop() - self.kerberos_keytab_p.stop() - self.kerberos_http_host_p.stop() - - -class TestAuthModuleKerberos: - @pytest.mark.parametrize("allowed_users", (set(), {"mprahl"})) - @patch("kerberos.authGSSServerInit", return_value=(kerberos.AUTH_GSS_COMPLETE, object())) - @patch("kerberos.authGSSServerStep", return_value=kerberos.AUTH_GSS_COMPLETE) - @patch("kerberos.authGSSServerResponse", return_value="STOKEN") - @patch("kerberos.authGSSServerUserName", return_value="mprahl@EXAMPLE.ORG") - @patch("kerberos.authGSSServerClean") - @patch("kerberos.getServerPrincipalDetails") - @patch.dict("os.environ") - @patch("module_build_service.auth.stack") - @patch.object(mbs_config.Config, "allowed_users", new_callable=PropertyMock) - def test_get_user_kerberos( - self, m_allowed_users, stack, principal, clean, name, response, step, init, allowed_users - ): - """ - Test that authentication works with Kerberos and LDAP - """ - m_allowed_users.return_value = allowed_users - mock_top = Mock() - stack.return_value = mock_top + with pytest.raises(module_build_service.errors.Unauthorized): + module_build_service.auth.get_user_kerberos(request) - headers = {"Authorization": "foobar"} - request = mock.MagicMock() - request.headers.return_value = mock.MagicMock(spec_set=dict) - request.headers.__getitem__.side_effect = headers.__getitem__ - request.headers.__setitem__.side_effect = headers.__setitem__ - request.headers.__contains__.side_effect = headers.__contains__ - - # Create the mock LDAP instance - server = ldap3.Server("ldaps://test.domain.local") - connection = ldap3.Connection(server, client_strategy=ldap3.MOCK_SYNC) - base_dn = "dc=domain,dc=local" - factory_group_attrs = { - "objectClass": ["top", "posixGroup"], - "memberUid": ["mprahl", "tbrady"], - "gidNumber": 1234, - "cn": ["factory2-devs"], - } - devs_group_attrs = { - "objectClass": ["top", "posixGroup"], - "memberUid": ["mprahl", "mikeb"], - "gidNumber": 1235, - "cn": ["devs"], - } - athletes_group_attrs = { - "objectClass": ["top", "posixGroup"], - "memberUid": ["tbrady", "rgronkowski"], - "gidNumber": 1236, - "cn": ["athletes"], - } - mprahl_attrs = { - "memberOf": ["cn=Employee,ou=groups,{0}".format(base_dn)], - "uid": ["mprahl"], - "cn": ["mprahl"], - "objectClass": ["top", "person"], - } - connection.strategy.add_entry( - "cn=factory2-devs,ou=groups,{0}".format(base_dn), factory_group_attrs - ) - connection.strategy.add_entry( - "cn=athletes,ou=groups,{0}".format(base_dn), athletes_group_attrs - ) - connection.strategy.add_entry("cn=devs,ou=groups,{0}".format(base_dn), devs_group_attrs) - connection.strategy.add_entry("cn=mprahl,ou=users,{0}".format(base_dn), mprahl_attrs) - - # If the user is in allowed_users, then group membership is not checked, and an empty set - # is just returned for the groups - if allowed_users: - expected_groups = set() - else: - expected_groups = {"devs", "factory2-devs"} - - with patch("ldap3.Connection") as mock_ldap_con, KerberosMockConfig(): - mock_ldap_con.return_value = connection - assert module_build_service.auth.get_user_kerberos(request) == ( - "mprahl", expected_groups) - - def test_auth_header_not_set(self): - """ - Test that an Unauthorized exception is returned when there is no authorization header - set. - """ - headers = {} - request = mock.MagicMock() - request.headers.return_value = mock.MagicMock(spec_set=dict) - request.headers.__getitem__.side_effect = headers.__getitem__ - request.headers.__setitem__.side_effect = headers.__setitem__ - request.headers.__contains__.side_effect = headers.__contains__ - - with KerberosMockConfig(): - try: - module_build_service.auth.get_user_kerberos(request) - assert False, "Unauthorized error not raised" - except FlaskUnauthorized as error: - assert error.response.www_authenticate.to_header().strip() == "Negotiate" - assert error.response.status == "401 UNAUTHORIZED" - - @patch.dict(environ) - def test_keytab_not_set(self): - """ - Test that authentication fails when the keytab is not set - """ - if "KRB5_KTNAME" in environ: - del environ["KRB5_KTNAME"] - - headers = {"Authorization": "foobar"} - request = mock.MagicMock() - request.headers.return_value = mock.MagicMock(spec_set=dict) - request.headers.__getitem__.side_effect = headers.__getitem__ - request.headers.__setitem__.side_effect = headers.__setitem__ - request.headers.__contains__.side_effect = headers.__contains__ - - with KerberosMockConfig(kt=""): - try: - module_build_service.auth.get_user_kerberos(request) - assert False, "Unauthorized error not raised" - except module_build_service.errors.Unauthorized as error: - assert str(error) == ( - 'Kerberos: set the config value of "KERBEROS_KEYTAB" ' - 'or the environment variable "KRB5_KTNAME" to your keytab file' - ) + @patch.object(module_build_service.auth.conf, "allowed_users", new=["someone", "somebody"]) + def test_get_user_kerberos_user_is_in_allowed_users_group(self): + request = Mock() + request.environ.get.return_value = "someone@realm" - # Set the return value to something not 0 (continue) or 1 (complete) - @patch("kerberos.authGSSServerInit", return_value=(100, object())) - @patch("kerberos.authGSSServerStep", return_value=kerberos.AUTH_GSS_COMPLETE) - @patch("kerberos.authGSSServerResponse", return_value="STOKEN") - @patch("kerberos.authGSSServerUserName", return_value="mprahl@EXAMPLE.ORG") - @patch("kerberos.authGSSServerClean") - @patch("kerberos.getServerPrincipalDetails") - @patch.dict("os.environ") - @patch("module_build_service.auth.stack") - def test_get_user_kerberos_invalid_ticket( - self, stack, principal, clean, name, response, step, init - ): - """ - Test that authentication fails with an invalid Kerberos ticket - """ - mock_top = Mock() - stack.return_value = mock_top - - headers = {"Authorization": "foobar"} - request = mock.MagicMock() - request.headers.return_value = mock.MagicMock(spec_set=dict) - request.headers.__getitem__.side_effect = headers.__getitem__ - request.headers.__setitem__.side_effect = headers.__setitem__ - request.headers.__contains__.side_effect = headers.__contains__ - - with KerberosMockConfig(): - try: - module_build_service.auth.get_user_kerberos(request) - assert False, "Forbidden error not raised" - except module_build_service.errors.Forbidden as error: - assert str(error) == ("Invalid Kerberos ticket") + username, groups = module_build_service.auth.get_user_kerberos(request) + assert "someone" == username + assert set() == groups + + @patch.object(module_build_service.auth.conf, "allowed_users", new=["someone", "somebody"]) + @patch("module_build_service.auth.get_ldap_group_membership", return_value=["group1", "group2"]) + def test_get_user_kerberos_user_is_not_in_allowed_users_group(self, get_ldap_group_membership): + request = Mock() + request.environ.get.return_value = "x-man@realm" + + username, groups = module_build_service.auth.get_user_kerberos(request) + assert "x-man" == username + assert {"group1", "group2"} == groups