From b320d2f1d7874b72a031ea9560869636f80e984d Mon Sep 17 00:00:00 2001 From: Mike McLean Date: Dec 07 2017 23:16:06 +0000 Subject: PR#708: Implement support for keytab in gssapi codepaths Merges #708 https://pagure.io/koji/pull-request/708 --- diff --git a/koji/__init__.py b/koji/__init__.py index 67819e9..f5e2374 100644 --- a/koji/__init__.py +++ b/koji/__init__.py @@ -2126,16 +2126,17 @@ class ClientSession(object): principal. The principal must be in the "ProxyPrincipals" list on the server side.""" - if principal is None and keytab is None and ccache is None: - try: - # Silently try GSSAPI first - if self.gssapi_login(proxyuser=proxyuser): - return True - except: - if krbV: - pass - else: - raise + try: + # Silently try GSSAPI first + if self.gssapi_login(principal, keytab, ccache, proxyuser=proxyuser): + return True + except Exception as e: + if krbV: + e_str = ''.join(traceback.format_exception_only(type(e), e)) + self.logger.debug('gssapi auth failed: %s', e_str) + pass + else: + raise if not krbV: raise PythonImportError( @@ -2224,7 +2225,7 @@ class ClientSession(object): # else return host - def gssapi_login(self, proxyuser=None): + def gssapi_login(self, principal=None, keytab=None, ccache=None, proxyuser=None): if not HTTPKerberosAuth: raise PythonImportError( "Please install python-requests-kerberos to use GSSAPI." @@ -2241,21 +2242,38 @@ class ClientSession(object): # 60 second timeout during login sinfo = None + old_env = {} old_opts = self.opts self.opts = old_opts.copy() - self.opts['timeout'] = 60 - self.opts['auth'] = HTTPKerberosAuth() try: + self.opts['timeout'] = 60 + kwargs = {} + if keytab: + old_env['KRB5_CLIENT_KTNAME'] = os.environ.get('KRB5_CLIENT_KTNAME') + os.environ['KRB5_CLIENT_KTNAME'] = keytab + if ccache: + old_env['KRB5CCNAME'] = os.environ.get('KRB5CCNAME') + os.environ['KRB5CCNAME'] = ccache + if principal: + kwargs['principal'] = principal + self.opts['auth'] = HTTPKerberosAuth(**kwargs) try: # Depending on the server configuration, we might not be able to # connect without client certificate, which means that the conn # will fail with a handshake failure, which is retried by default. sinfo = self._callMethod('sslLogin', [proxyuser], retry=False) - except: + except Exception as e: + e_str = ''.join(traceback.format_exception_only(type(e), e)) + self.logger.debug('gssapi auth failed: %s', e_str) # Auth with https didn't work. Restore for the next attempt. self.baseurl = old_baseurl finally: self.opts = old_opts + for key in old_env: + if old_env[key] is None: + del os.environ[key] + else: + os.environ[key] = old_env[key] if not sinfo: raise AuthError('unable to obtain a session') diff --git a/tests/test_lib/test_gssapi.py b/tests/test_lib/test_gssapi.py new file mode 100644 index 0000000..4927f96 --- /dev/null +++ b/tests/test_lib/test_gssapi.py @@ -0,0 +1,67 @@ +from __future__ import absolute_import +import mock +import os +import unittest + +import koji + + +class TestGSSAPI(unittest.TestCase): + + def setUp(self): + self.session = koji.ClientSession('https://koji.example.com/kojihub', {}) + self.session._callMethod = mock.MagicMock(name='_callMethod') + + def tearDown(self): + mock.patch.stopall() + + maxDiff = None + + @mock.patch('koji.HTTPKerberosAuth', new=None) + def test_gssapi_disabled(self): + with self.assertRaises(ImportError): + self.session.gssapi_login() + + def test_gssapi_login(self): + old_environ = dict(**os.environ) + self.session.gssapi_login() + self.session._callMethod.assert_called_once_with('sslLogin', [None], + retry=False) + self.assertEqual(old_environ, dict(**os.environ)) + + def test_gssapi_login_keytab(self): + principal = 'user@EXAMPLE.COM' + keytab = '/path/to/keytab' + ccache = '/path/to/cache' + old_environ = dict(**os.environ) + self.session.gssapi_login(principal, keytab, ccache) + self.session._callMethod.assert_called_once_with('sslLogin', [None], + retry=False) + self.assertEqual(old_environ, dict(**os.environ)) + + def test_gssapi_login_error(self): + old_environ = dict(**os.environ) + self.session._callMethod.side_effect = Exception('login failed') + with self.assertRaises(koji.AuthError): + self.session.gssapi_login() + self.session._callMethod.assert_called_once_with('sslLogin', [None], + retry=False) + self.assertEqual(old_environ, dict(**os.environ)) + + def test_gssapi_login_http(self): + old_environ = dict(**os.environ) + url1 = 'http://koji.example.com/kojihub' + url2 = 'https://koji.example.com/kojihub' + + # successful gssapi auth should force https + self.session.baseurl = url1 + self.session.gssapi_login() + self.assertEqual(self.session.baseurl, url2) + + # failed gssapi auth should leave the url alone + self.session.baseurl = url1 + self.session._callMethod.side_effect = Exception('login failed') + with self.assertRaises(koji.AuthError): + self.session.gssapi_login() + self.assertEqual(self.session.baseurl, url1) + self.assertEqual(old_environ, dict(**os.environ))