| |
@@ -1,13 +1,17 @@
|
| |
from __future__ import absolute_import
|
| |
+
|
| |
import mock
|
| |
+
|
| |
try:
|
| |
import unittest2 as unittest
|
| |
except ImportError:
|
| |
import unittest
|
| |
+ import six
|
| |
|
| |
import koji
|
| |
import koji.auth
|
| |
|
| |
+
|
| |
class TestAuthSession(unittest.TestCase):
|
| |
def test_instance(self):
|
| |
"""Simple auth.Session instance"""
|
| |
@@ -27,7 +31,8 @@
|
| |
context.cnx.cursor.return_value = cursor
|
| |
cursor.fetchone.side_effect = [
|
| |
# get session
|
| |
- [koji.AUTHTYPE_NORMAL, 344, False, False, 'master', 'start_time', 'start_ts', 'update_time', 'update_ts', 'user_id'],
|
| |
+ [koji.AUTHTYPE_NORMAL, 344, False, False, 'master', 'start_time',
|
| |
+ 'start_ts', 'update_time', 'update_ts', 'user_id'],
|
| |
# get user
|
| |
['name', koji.USER_STATUS['NORMAL'], koji.USERTYPES['NORMAL']],
|
| |
# get excl.session
|
| |
@@ -107,7 +112,8 @@
|
| |
|
| |
s.makeShared()
|
| |
c = cursor.execute.call_args[0]
|
| |
- self.assertEqual(c[0], 'UPDATE sessions SET "exclusive"=NULL WHERE id=%(session_id)s')
|
| |
+ self.assertEqual(c[0],
|
| |
+ 'UPDATE sessions SET "exclusive"=NULL WHERE id=%(session_id)s')
|
| |
self.assertEqual(c[1]['session_id'], 123)
|
| |
|
| |
@mock.patch('socket.gethostbyname')
|
| |
@@ -165,17 +171,54 @@
|
| |
s.login('user', 'password')
|
| |
|
| |
@mock.patch('koji.auth.context')
|
| |
- def test_krbLogin(self, context):
|
| |
+ @mock.patch('koji.auth.socket')
|
| |
+ @mock.patch('koji.auth.base64')
|
| |
+ def test_krbLogin(self, base64, socket, context):
|
| |
# TODO
|
| |
s, cntext, cursor = self.get_session()
|
| |
context.cnx = cntext.cnx
|
| |
|
| |
- with self.assertRaises(koji.AuthError):
|
| |
+ with self.assertRaises(koji.AuthError) as cm:
|
| |
s.krbLogin('krb_req', 'proxyuser')
|
| |
+ self.assertEqual(cm.exception.args[0], 'Already logged in')
|
| |
|
| |
s.logged_in = False
|
| |
- with self.assertRaises(TypeError):
|
| |
- s.krbLogin('krb_req', 'proxyuser')
|
| |
+ if six.PY3:
|
| |
+ with self.assertRaises(koji.AuthError) as cm:
|
| |
+ s.krbLogin('krb_req', 'proxyuser')
|
| |
+ self.assertEqual(cm.exception.args[0], 'krbV module not installed')
|
| |
+ else:
|
| |
+ with mock.patch('koji.auth.krbV', create=True) as krbV:
|
| |
+ princ = mock.MagicMock()
|
| |
+ princ.name = 'princ_name'
|
| |
+ krbV.default_context.return_value \
|
| |
+ .rd_req.return_value = (mock.MagicMock(), 2, 3,
|
| |
+ [1, 2, princ])
|
| |
+ with self.assertRaises(koji.AuthError) as cm:
|
| |
+ s.krbLogin('krb_req', 'proxyuser')
|
| |
+ self.assertEqual(cm.exception.args[0],
|
| |
+ 'Kerberos principal princ_name is'
|
| |
+ ' not authorized to log in other users')
|
| |
+ context.opts = {'ProxyPrincipals': 'anyothers,' + princ.name,
|
| |
+ 'AuthPrincipal': 'authprinc',
|
| |
+ 'AuthKeytab': 'authkeytab',
|
| |
+ 'LoginCreatesUser': False,
|
| |
+ 'CheckClientIP': False}
|
| |
+ with self.assertRaises(koji.AuthError) as cm:
|
| |
+ s.krbLogin('krb_req', 'proxyuser@realm.com')
|
| |
+ self.assertEqual(cm.exception.args[0],
|
| |
+ 'Unknown Kerberos principal:'
|
| |
+ ' proxyuser@realm.com')
|
| |
+ context.opts['LoginCreatesUser'] = True
|
| |
+ context.cnx.cursor.return_value. \
|
| |
+ fetchone.side_effect = [None,
|
| |
+ None,
|
| |
+ None,
|
| |
+ (1,),
|
| |
+ ('name', 'type',
|
| |
+ koji.USER_STATUS['NORMAL']),
|
| |
+ ('session-id',)]
|
| |
+ s.krbLogin('krb_req', 'proxyuser@realm.com')
|
| |
|
| |
# functions outside Session object
|
| |
|
| |
@@ -187,7 +230,8 @@
|
| |
cursor.fetchone.return_value = ['name', 'status', 'usertype']
|
| |
|
| |
self.assertEqual(sorted(koji.auth.get_user_data(1).items()),
|
| |
- sorted({'name': 'name', 'status': 'status', 'usertype': 'usertype'}.items()))
|
| |
+ sorted({'name': 'name', 'status': 'status',
|
| |
+ 'usertype': 'usertype'}.items()))
|
| |
|
| |
cursor.fetchone.return_value = None
|
| |
self.assertEqual(koji.auth.get_user_data(1), None)
|
| |