| |
@@ -6,9 +6,43 @@
|
| |
|
| |
import koji
|
| |
import koji.auth
|
| |
+ import koji.db
|
| |
+ import datetime
|
| |
+
|
| |
+ UP = koji.auth.UpdateProcessor
|
| |
+ QP = koji.auth.QueryProcessor
|
| |
|
| |
|
| |
class TestAuthSession(unittest.TestCase):
|
| |
+ def getUpdate(self, *args, **kwargs):
|
| |
+ update = UP(*args, **kwargs)
|
| |
+ update.execute = mock.MagicMock()
|
| |
+ self.updates.append(update)
|
| |
+ return update
|
| |
+
|
| |
+ def getQuery(self, *args, **kwargs):
|
| |
+ query = QP(*args, **kwargs)
|
| |
+ query.execute = self.query_execute
|
| |
+ query.executeOne = self.query_executeOne
|
| |
+ query.singleValue = self.query_singleValue
|
| |
+ self.queries.append(query)
|
| |
+ return query
|
| |
+
|
| |
+ def setUp(self):
|
| |
+ self.context = mock.patch('kojihub.context').start()
|
| |
+ self.UpdateProcessor = mock.patch('koji.auth.UpdateProcessor',
|
| |
+ side_effect=self.getUpdate).start()
|
| |
+ self.updates = []
|
| |
+ self.query_execute = mock.MagicMock()
|
| |
+ self.query_executeOne = mock.MagicMock()
|
| |
+ self.query_singleValue = mock.MagicMock()
|
| |
+ self.QueryProcessor = mock.patch('koji.auth.QueryProcessor',
|
| |
+ side_effect=self.getQuery).start()
|
| |
+ self.queries = []
|
| |
+ # It seems MagicMock will not automatically handle attributes that
|
| |
+ # start with "assert"
|
| |
+ self.context.session.assertLogin = mock.MagicMock()
|
| |
+
|
| |
def test_instance(self):
|
| |
"""Simple auth.Session instance"""
|
| |
s = koji.auth.Session()
|
| |
@@ -23,54 +57,120 @@
|
| |
'QUERY_STRING': 'session-id=123&session-key=xyz&callnum=345',
|
| |
'REMOTE_ADDR': 'remote-addr',
|
| |
}
|
| |
- cursor = mock.MagicMock(name='cursor')
|
| |
- context.cnx.cursor.return_value = cursor
|
| |
- cursor.fetchone.side_effect = [
|
| |
- # get session
|
| |
- [koji.AUTHTYPES['NORMAL'], 344, False, False, 'master', 'start_time',
|
| |
- 'start_ts', 'update_time', 'update_ts', 'user_id'],
|
| |
- # get user
|
| |
- ['name', koji.USER_STATUS['NORMAL'], koji.USERTYPES['NORMAL']],
|
| |
- # get excl.session
|
| |
- None,
|
| |
- # upd. timestamp
|
| |
- None,
|
| |
- # upd callnum
|
| |
- None,
|
| |
- ]
|
| |
|
| |
+ self.query_executeOne.side_effect = [
|
| |
+ {'authtype': 2, 'callnum': 1, "date_part('epoch', start_time)": 1666599426.227002,
|
| |
+ "date_part('epoch', update_time)": 1666599426.254308, 'exclusive': None,
|
| |
+ 'expired': False, 'master': None,
|
| |
+ 'start_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 227002,
|
| |
+ tzinfo=datetime.timezone.utc),
|
| |
+ 'update_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 254308,
|
| |
+ tzinfo=datetime.timezone.utc),
|
| |
+ 'user_id': 1},
|
| |
+ {'name': 'kojiadmin', 'status': 0, 'usertype': 0}]
|
| |
+ self.query_singleValue.return_value = 123
|
| |
s = koji.auth.Session()
|
| |
- return s, context, cursor
|
| |
+ return s, context
|
| |
|
| |
- @mock.patch('koji.auth.context')
|
| |
- def test_basic_instance(self, context):
|
| |
+ def test_basic_instance(self):
|
| |
"""auth.Session instance"""
|
| |
- s, cntext, cursor = self.get_session()
|
| |
- context.cnx = cntext.cnx
|
| |
-
|
| |
- self.assertEqual(s.id, 123)
|
| |
- self.assertEqual(s.key, 'xyz')
|
| |
- self.assertEqual(s.hostip, 'remote-addr')
|
| |
- self.assertEqual(s.callnum, 345)
|
| |
- self.assertEqual(s.user_id, 'user_id')
|
| |
- self.assertEqual(s.authtype, koji.AUTHTYPES['NORMAL'])
|
| |
- self.assertEqual(s.master, 'master')
|
| |
- self.assertTrue(s.logged_in)
|
| |
-
|
| |
- # 5 SQL calls: get session, get user, get excl. session,
|
| |
- # update timestamp, update callnum
|
| |
- self.assertEqual(cursor.execute.call_count, 5)
|
| |
-
|
| |
- @mock.patch('koji.auth.context')
|
| |
- def test_getattr(self, context):
|
| |
+ s, cntext = self.get_session()
|
| |
+ self.assertEqual(len(self.updates), 2)
|
| |
+ update = self.updates[0]
|
| |
+
|
| |
+ self.assertEqual(update.table, 'sessions')
|
| |
+ self.assertEqual(update.values['id'], 123)
|
| |
+ self.assertEqual(update.clauses, ['id = %(id)i'])
|
| |
+ self.assertEqual(update.data, {})
|
| |
+ self.assertEqual(update.rawdata, {'update_time': 'NOW()'})
|
| |
+
|
| |
+ update = self.updates[1]
|
| |
+
|
| |
+ self.assertEqual(update.table, 'sessions')
|
| |
+ self.assertEqual(update.values['id'], 123)
|
| |
+ self.assertEqual(update.clauses, ['id = %(id)i'])
|
| |
+ self.assertEqual(update.data, {})
|
| |
+ self.assertEqual(update.rawdata, {'callnum': 345})
|
| |
+
|
| |
+ self.assertEqual(len(self.queries), 3)
|
| |
+
|
| |
+ query = self.queries[0]
|
| |
+ self.assertEqual(query.tables, ['sessions'])
|
| |
+ self.assertEqual(query.joins, None)
|
| |
+ self.assertEqual(query.clauses, ['hostip = %(hostip)s', 'id = %(id)i', 'key = %(key)s'])
|
| |
+ self.assertEqual(query.columns, ['authtype', 'callnum', 'exclusive', 'expired', 'master',
|
| |
+ 'start_time', "date_part('epoch', start_time)",
|
| |
+ 'update_time', "date_part('epoch', update_time)",
|
| |
+ 'user_id'])
|
| |
+ self.assertEqual(query.aliases, ['authtype', 'callnum', 'exclusive', 'expired', 'master',
|
| |
+ 'start_time', 'start_ts', 'update_time', 'update_ts',
|
| |
+ 'user_id'])
|
| |
+ self.assertEqual(query.values, {'id': 123, 'key': 'xyz', 'hostip': 'remote-addr'})
|
| |
+
|
| |
+ query = self.queries[1]
|
| |
+ self.assertEqual(query.tables, ['users'])
|
| |
+ self.assertEqual(query.joins, None)
|
| |
+ self.assertEqual(query.clauses, ['id=%(user_id)s'])
|
| |
+ self.assertEqual(query.columns, ['name', 'status', 'usertype'])
|
| |
+ self.assertEqual(query.values, {'user_id': 1})
|
| |
+
|
| |
+ query = self.queries[2]
|
| |
+ self.assertEqual(query.tables, ['sessions'])
|
| |
+ self.assertEqual(query.joins, None)
|
| |
+ self.assertEqual(query.clauses, ['exclusive = TRUE', 'expired = FALSE',
|
| |
+ 'user_id=%(user_id)s'])
|
| |
+ self.assertEqual(query.columns, ['id'])
|
| |
+
|
| |
+ def test_getattr(self):
|
| |
"""auth.Session instance"""
|
| |
- s, cntext, cursor = self.get_session()
|
| |
- context.cnx = cntext.cnx
|
| |
+ s, cntext = self.get_session()
|
| |
+
|
| |
+ self.assertEqual(len(self.updates), 2)
|
| |
+ update = self.updates[0]
|
| |
+
|
| |
+ self.assertEqual(update.table, 'sessions')
|
| |
+ self.assertEqual(update.values['id'], 123)
|
| |
+ self.assertEqual(update.clauses, ['id = %(id)i'])
|
| |
+ self.assertEqual(update.data, {})
|
| |
+ self.assertEqual(update.rawdata, {'update_time': 'NOW()'})
|
| |
+
|
| |
+ update = self.updates[1]
|
| |
+
|
| |
+ self.assertEqual(update.table, 'sessions')
|
| |
+ self.assertEqual(update.values['id'], 123)
|
| |
+ self.assertEqual(update.clauses, ['id = %(id)i'])
|
| |
+ self.assertEqual(update.data, {})
|
| |
+ self.assertEqual(update.rawdata, {'callnum': 345})
|
| |
+
|
| |
+ self.assertEqual(len(self.queries), 3)
|
| |
+
|
| |
+ query = self.queries[0]
|
| |
+ self.assertEqual(query.tables, ['sessions'])
|
| |
+ self.assertEqual(query.joins, None)
|
| |
+ self.assertEqual(query.clauses, ['hostip = %(hostip)s', 'id = %(id)i', 'key = %(key)s'])
|
| |
+ self.assertEqual(query.columns, ['authtype', 'callnum', 'exclusive', 'expired', 'master',
|
| |
+ 'start_time', "date_part('epoch', start_time)",
|
| |
+ 'update_time', "date_part('epoch', update_time)",
|
| |
+ 'user_id'])
|
| |
+ self.assertEqual(query.aliases, ['authtype', 'callnum', 'exclusive', 'expired', 'master',
|
| |
+ 'start_time', 'start_ts', 'update_time', 'update_ts',
|
| |
+ 'user_id'])
|
| |
+ self.assertEqual(query.values, {'id': 123, 'key': 'xyz', 'hostip': 'remote-addr'})
|
| |
+
|
| |
+ query = self.queries[1]
|
| |
+ self.assertEqual(query.tables, ['users'])
|
| |
+ self.assertEqual(query.joins, None)
|
| |
+ self.assertEqual(query.clauses, ['id=%(user_id)s'])
|
| |
+ self.assertEqual(query.columns, ['name', 'status', 'usertype'])
|
| |
+ self.assertEqual(query.values, {'user_id': 1})
|
| |
+
|
| |
+ query = self.queries[2]
|
| |
+ self.assertEqual(query.tables, ['sessions'])
|
| |
+ self.assertEqual(query.joins, None)
|
| |
+ self.assertEqual(query.clauses, ['exclusive = TRUE', 'expired = FALSE',
|
| |
+ 'user_id=%(user_id)s'])
|
| |
+ self.assertEqual(query.columns, ['id'])
|
| |
|
| |
- # test
|
| |
- self.assertEqual(s.perms, {})
|
| |
- self.assertEqual(s.groups, {})
|
| |
- self.assertEqual(s.host_id, None)
|
| |
# all other names should raise error
|
| |
with self.assertRaises(AttributeError):
|
| |
s.non_existing_attribute
|
| |
@@ -78,7 +178,7 @@
|
| |
@mock.patch('koji.auth.context')
|
| |
def test_str(self, context):
|
| |
"""auth.Session string representation"""
|
| |
- s, cntext, cursor = self.get_session()
|
| |
+ s, cntext = self.get_session()
|
| |
context.cnx = cntext.cnx
|
| |
|
| |
s.logged_in = False
|
| |
@@ -90,7 +190,7 @@
|
| |
@mock.patch('koji.auth.context')
|
| |
def test_validate(self, context):
|
| |
"""Session.validate"""
|
| |
- s, cntext, cursor = self.get_session()
|
| |
+ s, cntext = self.get_session()
|
| |
context.cnx = cntext.cnx
|
| |
|
| |
s.lockerror = True
|
| |
@@ -103,20 +203,26 @@
|
| |
@mock.patch('koji.auth.context')
|
| |
def test_makeShared(self, context):
|
| |
"""Session.makeShared"""
|
| |
- s, cntext, cursor = self.get_session()
|
| |
- context.cnx = cntext.cnx
|
| |
-
|
| |
+ s, _ = self.get_session()
|
| |
s.makeShared()
|
| |
- c = cursor.execute.call_args[0]
|
| |
- self.assertEqual(c[0],
|
| |
- 'UPDATE sessions SET "exclusive"=NULL WHERE id=%(session_id)s')
|
| |
- self.assertEqual(c[1]['session_id'], 123)
|
| |
+ self.assertEqual(len(self.updates), 3)
|
| |
+ # check only last update query, first two are tested in test_basic_instance
|
| |
+ update = self.updates[2]
|
| |
+
|
| |
+ self.assertEqual(update.table, 'sessions')
|
| |
+ self.assertEqual(update.values['session_id'], 123)
|
| |
+ self.assertEqual(update.clauses, ['id=%(session_id)s'])
|
| |
+ self.assertEqual(update.data, {'exclusive': None})
|
| |
+ self.assertEqual(update.rawdata, {})
|
| |
+
|
| |
+ self.assertEqual(len(self.queries), 3)
|
| |
+ # all queries are tested in test_basic_instance
|
| |
|
| |
@mock.patch('socket.gethostbyname')
|
| |
@mock.patch('koji.auth.context')
|
| |
def test_get_remote_ip(self, context, gethostbyname):
|
| |
"""Session.get_remote_ip"""
|
| |
- s, cntext, cursor = self.get_session()
|
| |
+ s, cntext = self.get_session()
|
| |
|
| |
context.opts = {'CheckClientIP': False}
|
| |
self.assertEqual(s.get_remote_ip(), '-')
|
| |
@@ -133,8 +239,7 @@
|
| |
|
| |
@mock.patch('koji.auth.context')
|
| |
def test_login(self, context):
|
| |
- s, cntext, cursor = self.get_session()
|
| |
- context.cnx = cntext.cnx
|
| |
+ s, cntext = self.get_session()
|
| |
|
| |
# already logged in
|
| |
with self.assertRaises(koji.GenericError):
|
| |
@@ -153,22 +258,35 @@
|
| |
s.checkLoginAllowed.return_value = True
|
| |
s.createSession = mock.MagicMock()
|
| |
s.createSession.return_value = {'session-id': 'session-id'}
|
| |
- cursor.fetchone = mock.MagicMock()
|
| |
- cursor.fetchone.return_value = ['user_id']
|
| |
+ self.query_singleValue.return_value = 123
|
| |
+
|
| |
result = s.login('user', 'password')
|
| |
|
| |
+ self.assertEqual(len(self.queries), 4)
|
| |
+
|
| |
+ # check only last update query, first three are tested in test_basic_instance
|
| |
+ query = self.queries[3]
|
| |
+ self.assertEqual(query.tables, ['users'])
|
| |
+ self.assertEqual(query.joins, None)
|
| |
+ self.assertEqual(query.clauses, ['name = %(user)s', 'password = %(password)s'])
|
| |
+ self.assertEqual(query.columns, ['id'])
|
| |
+ self.assertEqual(query.values, {'user': 'user', 'password': 'password'})
|
| |
+
|
| |
+ self.assertEqual(len(self.updates), 2)
|
| |
+ # all updates are tested in test_basic_instance
|
| |
+
|
| |
self.assertEqual(s.get_remote_ip.call_count, 1)
|
| |
- self.assertEqual(s.checkLoginAllowed.call_args, mock.call('user_id'))
|
| |
+ self.assertEqual(s.checkLoginAllowed.call_args, mock.call(123))
|
| |
self.assertEqual(result, s.createSession.return_value)
|
| |
|
| |
# one more try for non-existing user
|
| |
- cursor.fetchone.return_value = None
|
| |
+ self.query_singleValue.return_value = None
|
| |
with self.assertRaises(koji.AuthError):
|
| |
s.login('user', 'password')
|
| |
|
| |
@mock.patch('koji.auth.context')
|
| |
def test_checkKrbPrincipal(self, context):
|
| |
- s, cntext, cursor = self.get_session()
|
| |
+ s, cntext = self.get_session()
|
| |
self.assertIsNone(s.checkKrbPrincipal(None))
|
| |
context.opts = {'AllowedKrbRealms': '*'}
|
| |
self.assertIsNone(s.checkKrbPrincipal('any'))
|
| |
@@ -191,18 +309,318 @@
|
| |
' , example.org'}
|
| |
self.assertIsNone(s.checkKrbPrincipal('user@example.net'))
|
| |
|
| |
- # functions outside Session object
|
| |
+ def test_getUserIdFromKerberos(self):
|
| |
+ krb_principal = 'test-krb-principal'
|
| |
+ self.query_singleValue.return_value = 135
|
| |
+ s, cntext = self.get_session()
|
| |
+ s.checkKrbPrincipal = mock.MagicMock()
|
| |
+ s.checkKrbPrincipal.return_value = True
|
| |
+
|
| |
+ s.getUserIdFromKerberos(krb_principal)
|
| |
+
|
| |
+ self.assertEqual(len(self.queries), 4)
|
| |
+ # check only last update query, first three are tested in test_basic_instance
|
| |
+ query = self.queries[3]
|
| |
+ self.assertEqual(query.tables, ['users'])
|
| |
+ self.assertEqual(query.joins, ['user_krb_principals ON '
|
| |
+ 'users.id = user_krb_principals.user_id'])
|
| |
+ self.assertEqual(query.clauses, ['krb_principal = %(krb_principal)s'])
|
| |
+ self.assertEqual(query.columns, ['id'])
|
| |
+ self.assertEqual(query.values, {'krb_principal': krb_principal})
|
| |
+
|
| |
+ self.assertEqual(len(self.updates), 2)
|
| |
+ # all updates are tested in test_basic_instance
|
| |
+
|
| |
+ def test_getUserId(self):
|
| |
+ self.query_singleValue.return_value = 135
|
| |
+ s, cntext = self.get_session()
|
| |
+ username = 'test-user'
|
| |
+
|
| |
+ s.getUserId(username)
|
| |
+
|
| |
+ self.assertEqual(len(self.queries), 4)
|
| |
+ # check only last update query, first three are tested in test_basic_instance
|
| |
+ query = self.queries[3]
|
| |
+ self.assertEqual(query.tables, ['users'])
|
| |
+ self.assertEqual(query.joins, None)
|
| |
+ self.assertEqual(query.clauses, ['name = %(username)s'])
|
| |
+ self.assertEqual(query.columns, ['id'])
|
| |
+ self.assertEqual(query.values, {'username': username})
|
| |
+
|
| |
+ self.assertEqual(len(self.updates), 2)
|
| |
+ # all updates are tested in test_basic_instance
|
| |
+
|
| |
+ def test_getHostId(self):
|
| |
+ self.query_singleValue.return_value = 199
|
| |
+ s, cntext = self.get_session()
|
| |
+
|
| |
+ s._getHostId()
|
| |
+
|
| |
+ self.assertEqual(len(self.queries), 4)
|
| |
+ # check only last update query, first three are tested in test_basic_instance
|
| |
+ query = self.queries[3]
|
| |
+ self.assertEqual(query.tables, ['host'])
|
| |
+ self.assertEqual(query.joins, None)
|
| |
+ self.assertEqual(query.clauses, ['user_id = %(uid)d'])
|
| |
+ self.assertEqual(query.columns, ['id'])
|
| |
+ self.assertEqual(query.values, {'uid': 1})
|
| |
+
|
| |
+ self.assertEqual(len(self.updates), 2)
|
| |
+ # all updates are tested in test_basic_instance
|
| |
+
|
| |
+ def test_logout_not_logged(self):
|
| |
+ s, cntext = self.get_session()
|
| |
+ s.logged_in = False
|
| |
+ with self.assertRaises(koji.AuthError) as cm:
|
| |
+ s.logout()
|
| |
+ self.assertEqual(cm.exception.args[0], 'Not logged in')
|
| |
|
| |
@mock.patch('koji.auth.context')
|
| |
- def test_get_user_data(self, context):
|
| |
- """koji.auth.get_user_data"""
|
| |
- cursor = mock.MagicMock(name='cursor')
|
| |
- context.cnx.cursor.return_value = cursor
|
| |
- cursor.fetchone.return_value = ['name', 'status', 'usertype']
|
| |
+ def test_logout_logged(self, context):
|
| |
+ s, cntext = self.get_session()
|
| |
+ s.logged_in = True
|
| |
+ s.logout()
|
| |
+
|
| |
+ self.assertEqual(len(self.queries), 3)
|
| |
+ # all queries are tested in test_basic_instance
|
| |
+
|
| |
+ self.assertEqual(len(self.updates), 3)
|
| |
+ # check only last update query, first two are tested in test_basic_instance
|
| |
+ update = self.updates[2]
|
| |
+
|
| |
+ self.assertEqual(update.table, 'sessions')
|
| |
+ self.assertEqual(update.values, {'id': 123, 'id': 123})
|
| |
+ self.assertEqual(update.clauses, ['id = %(id)i OR master = %(id)i'])
|
| |
+ self.assertEqual(update.data, {'expired': True, 'exclusive': None})
|
| |
+ self.assertEqual(update.rawdata, {})
|
| |
+
|
| |
+ def test_logoutChild_not_logged(self):
|
| |
+ s, cntext = self.get_session()
|
| |
+ s.logged_in = False
|
| |
+ with self.assertRaises(koji.AuthError) as cm:
|
| |
+ s.logoutChild(111)
|
| |
+ self.assertEqual(cm.exception.args[0], 'Not logged in')
|
| |
+
|
| |
+ @mock.patch('koji.auth.context')
|
| |
+ def test_logoutChild_logged(self, context):
|
| |
+ s, cntext = self.get_session()
|
| |
+ s.logged_in = True
|
| |
+ s.logoutChild(111)
|
| |
+
|
| |
+ self.assertEqual(len(self.queries), 3)
|
| |
+ # all queries are tested in test_basic_instance
|
| |
+
|
| |
+ self.assertEqual(len(self.updates), 3)
|
| |
+ # check only last update query, first two are tested in test_basic_instance
|
| |
+ update = self.updates[2]
|
| |
+
|
| |
+ self.assertEqual(update.table, 'sessions')
|
| |
+ self.assertEqual(update.values, {'session_id': 111, 'master': 123})
|
| |
+ self.assertEqual(update.clauses, ['id = %(session_id)i', 'master = %(master)i'])
|
| |
+ self.assertEqual(update.data, {'expired': True, 'exclusive': None})
|
| |
+ self.assertEqual(update.rawdata, {})
|
| |
+
|
| |
+ def test_makeExclusive_not_master(self):
|
| |
+ s, cntext = self.get_session()
|
| |
+ s.master = 333
|
| |
+ with self.assertRaises(koji.GenericError) as cm:
|
| |
+ s.makeExclusive()
|
| |
+ self.assertEqual(cm.exception.args[0], 'subsessions cannot become exclusive')
|
| |
+
|
| |
+ def test_makeExclusive_already_exclusive(self):
|
| |
+ s, cntext = self.get_session()
|
| |
+ s.master = None
|
| |
+ s.exclusive = True
|
| |
+ with self.assertRaises(koji.GenericError) as cm:
|
| |
+ s.makeExclusive()
|
| |
+ self.assertEqual(cm.exception.args[0], 'session is already exclusive')
|
| |
+
|
| |
+ def test_makeExclusive_without_force(self):
|
| |
+ s, cntext = self.get_session()
|
| |
+ s.master = None
|
| |
+ s.exclusive = False
|
| |
+ self.query_singleValue.return_value = 123
|
| |
+
|
| |
+ with self.assertRaises(koji.AuthLockError) as cm:
|
| |
+ s.makeExclusive()
|
| |
+ self.assertEqual(cm.exception.args[0], 'Cannot get exclusive session')
|
| |
+
|
| |
+ self.assertEqual(len(self.queries), 5)
|
| |
+ self.assertEqual(len(self.updates), 2)
|
| |
+
|
| |
+ @mock.patch('koji.auth.context')
|
| |
+ def test_makeExclusive(self, context):
|
| |
+ s, cntext = self.get_session()
|
| |
+ s.master = None
|
| |
+ s.exclusive = False
|
| |
+ self.query_singleValue.return_value = 123
|
| |
+
|
| |
+ s.makeExclusive(force=True)
|
| |
+
|
| |
+ self.assertEqual(len(self.queries), 5)
|
| |
+ # check only last two queries, first two are tested in test_basic_instance
|
| |
+
|
| |
+ query = self.queries[3]
|
| |
+ self.assertEqual(query.tables, ['users'])
|
| |
+ self.assertEqual(query.joins, None)
|
| |
+ self.assertEqual(query.clauses, ['id=%(user_id)s'])
|
| |
+ self.assertEqual(query.columns, ['id'])
|
| |
+ self.assertEqual(query.values, {'user_id': 1})
|
| |
+
|
| |
+ query = self.queries[4]
|
| |
+ self.assertEqual(query.tables, ['sessions'])
|
| |
+ self.assertEqual(query.joins, None)
|
| |
+ self.assertEqual(query.clauses, ['exclusive = TRUE', 'expired = FALSE',
|
| |
+ 'user_id=%(user_id)s'])
|
| |
+ self.assertEqual(query.columns, ['id'])
|
| |
+ self.assertEqual(query.values, {'user_id': 1})
|
| |
+
|
| |
+ self.assertEqual(len(self.updates), 4)
|
| |
+ # check only last two update queries, first two are tested in test_basic_instance
|
| |
+
|
| |
+ update = self.updates[2]
|
| |
+ self.assertEqual(update.table, 'sessions')
|
| |
+ self.assertEqual(update.values, {'excl_id': 123})
|
| |
+ self.assertEqual(update.clauses, ['id=%(excl_id)s'])
|
| |
+ self.assertEqual(update.data, {'expired': True, 'exclusive': None})
|
| |
+ self.assertEqual(update.rawdata, {})
|
| |
+
|
| |
+ update = self.updates[3]
|
| |
+ self.assertEqual(update.table, 'sessions')
|
| |
+ self.assertEqual(update.values, {'session_id': 123})
|
| |
+ self.assertEqual(update.clauses, ['id=%(session_id)s'])
|
| |
+ self.assertEqual(update.data, {'exclusive': True})
|
| |
+ self.assertEqual(update.rawdata, {})
|
| |
+
|
| |
+ def test_checkLoginAllowed(self):
|
| |
+ s, cntext = self.get_session()
|
| |
+ self.query_executeOne.side_effect = [{'name': 'testuser', 'status': 0, 'usertype': 0}]
|
| |
+ s.checkLoginAllowed(2)
|
| |
+
|
| |
+ self.assertEqual(len(self.queries), 4)
|
| |
+ query = self.queries[3]
|
| |
+ self.assertEqual(query.tables, ['users'])
|
| |
+ self.assertEqual(query.joins, None)
|
| |
+ self.assertEqual(query.clauses, ['id = %(user_id)i'])
|
| |
+ self.assertEqual(query.columns, ['name', 'status', 'usertype'])
|
| |
+ self.assertEqual(query.values, {'user_id': 2})
|
| |
+
|
| |
+ self.assertEqual(len(self.updates), 2)
|
| |
+
|
| |
+ def test_checkLoginAllowed_not_normal_status(self):
|
| |
+ s, cntext = self.get_session()
|
| |
+ self.query_executeOne.side_effect = [{'name': 'testuser', 'status': 1, 'usertype': 0}]
|
| |
+
|
| |
+ with self.assertRaises(koji.AuthError) as cm:
|
| |
+ s.checkLoginAllowed(2)
|
| |
+ self.assertEqual(cm.exception.args[0], 'logins by testuser are not allowed')
|
| |
|
| |
- self.assertEqual(sorted(koji.auth.get_user_data(1).items()),
|
| |
- sorted({'name': 'name', 'status': 'status',
|
| |
- 'usertype': 'usertype'}.items()))
|
| |
+ self.assertEqual(len(self.queries), 4)
|
| |
+ query = self.queries[3]
|
| |
+ self.assertEqual(query.tables, ['users'])
|
| |
+ self.assertEqual(query.joins, None)
|
| |
+ self.assertEqual(query.clauses, ['id = %(user_id)i'])
|
| |
+ self.assertEqual(query.columns, ['name', 'status', 'usertype'])
|
| |
+ self.assertEqual(query.values, {'user_id': 2})
|
| |
|
| |
- cursor.fetchone.return_value = None
|
| |
- self.assertEqual(koji.auth.get_user_data(1), None)
|
| |
+ self.assertEqual(len(self.updates), 2)
|
| |
+
|
| |
+ def test_checkLoginAllowed_not_exist_user(self):
|
| |
+ s, cntext = self.get_session()
|
| |
+ self.query_executeOne.side_effect = [None]
|
| |
+
|
| |
+ with self.assertRaises(koji.AuthError) as cm:
|
| |
+ s.checkLoginAllowed(2)
|
| |
+ self.assertEqual(cm.exception.args[0], 'invalid user_id: 2')
|
| |
+
|
| |
+ self.assertEqual(len(self.queries), 4)
|
| |
+ query = self.queries[3]
|
| |
+ self.assertEqual(query.tables, ['users'])
|
| |
+ self.assertEqual(query.joins, None)
|
| |
+ self.assertEqual(query.clauses, ['id = %(user_id)i'])
|
| |
+ self.assertEqual(query.columns, ['name', 'status', 'usertype'])
|
| |
+ self.assertEqual(query.values, {'user_id': 2})
|
| |
+
|
| |
+ self.assertEqual(len(self.updates), 2)
|
| |
+
|
| |
+ def test_createUserFromKerberos_invalid_krb(self):
|
| |
+ s, cntext = self.get_session()
|
| |
+ krb_principal = 'test-krb-princ'
|
| |
+ with self.assertRaises(koji.AuthError) as cm:
|
| |
+ s.createUserFromKerberos(krb_principal)
|
| |
+ self.assertEqual(cm.exception.args[0], 'invalid Kerberos principal: %s' % krb_principal)
|
| |
+
|
| |
+ def test_createUserFromKerberos_user_not_exists(self):
|
| |
+ self.query_execute.return_value = None
|
| |
+ s, cntext = self.get_session()
|
| |
+ krb_principal = 'test-krb-princ@redhat.com'
|
| |
+ s.createUser = mock.MagicMock()
|
| |
+ s.createUser.return_value = 3
|
| |
+ s.createUserFromKerberos(krb_principal)
|
| |
+ self.assertEqual(len(self.queries), 4)
|
| |
+ self.assertEqual(len(self.updates), 2)
|
| |
+
|
| |
+ query = self.queries[3]
|
| |
+ self.assertEqual(query.tables, ['users'])
|
| |
+ self.assertEqual(query.joins, ['LEFT JOIN user_krb_principals ON '
|
| |
+ 'users.id = user_krb_principals.user_id'])
|
| |
+ self.assertEqual(query.clauses, ['name = %(user_name)s'])
|
| |
+ self.assertEqual(query.columns, ['id', 'krb_principal'])
|
| |
+ self.assertEqual(query.values, {'user_name': 'test-krb-princ'})
|
| |
+
|
| |
+ def test_createUserFromKerberos_valid(self):
|
| |
+ self.query_execute.return_value = [{'id': 1, 'krb_principal': 'krb-user-1@redhat.com'},
|
| |
+ {'id': 1, 'krb_principal': 'krb-user-2@redhat.com'}]
|
| |
+ s, cntext = self.get_session()
|
| |
+ krb_principal = 'test-krb-princ@redhat.com'
|
| |
+ s.setKrbPrincipal = mock.MagicMock()
|
| |
+ s.setKrbPrincipal.return_value = 1
|
| |
+ s.createUserFromKerberos(krb_principal)
|
| |
+ self.assertEqual(len(self.queries), 4)
|
| |
+ self.assertEqual(len(self.updates), 2)
|
| |
+
|
| |
+ query = self.queries[3]
|
| |
+ self.assertEqual(query.tables, ['users'])
|
| |
+ self.assertEqual(query.joins, ['LEFT JOIN user_krb_principals ON '
|
| |
+ 'users.id = user_krb_principals.user_id'])
|
| |
+ self.assertEqual(query.clauses, ['name = %(user_name)s'])
|
| |
+ self.assertEqual(query.columns, ['id', 'krb_principal'])
|
| |
+ self.assertEqual(query.values, {'user_name': 'test-krb-princ'})
|
| |
+
|
| |
+ # functions outside Session object
|
| |
+
|
| |
+ def test_get_user_data(self):
|
| |
+ """koji.auth.get_user_data"""
|
| |
+ self.query_executeOne.return_value = None
|
| |
+ self.assertEqual(len(self.queries), 0)
|
| |
+
|
| |
+ self.query_executeOne.return_value = {'name': 'name', 'status': 'status',
|
| |
+ 'usertype': 'usertype'}
|
| |
+ koji.auth.get_user_data(1)
|
| |
+ self.assertEqual(len(self.queries), 1)
|
| |
+ query = self.queries[0]
|
| |
+ self.assertEqual(query.tables, ['users'])
|
| |
+ self.assertEqual(query.joins, None)
|
| |
+ self.assertEqual(query.clauses, ['id=%(user_id)s'])
|
| |
+ self.assertEqual(query.columns, ['name', 'status', 'usertype'])
|
| |
+
|
| |
+ def test_get_user_groups(self):
|
| |
+ """koji.auth.get_user_groups"""
|
| |
+ koji.auth.get_user_groups(1)
|
| |
+ self.assertEqual(len(self.queries), 1)
|
| |
+ query = self.queries[0]
|
| |
+ self.assertEqual(query.tables, ['user_groups'])
|
| |
+ self.assertEqual(query.joins, ['users ON group_id = users.id'])
|
| |
+ self.assertEqual(query.clauses, ['active = TRUE', 'user_id=%(user_id)i',
|
| |
+ 'users.usertype=%(t_group)i'])
|
| |
+ self.assertEqual(query.columns, ['group_id', 'name'])
|
| |
+
|
| |
+ def test_get_user_perms(self):
|
| |
+ """koji.auth.get_user_perms"""
|
| |
+ koji.auth.get_user_perms(1)
|
| |
+ self.assertEqual(len(self.queries), 1)
|
| |
+ query = self.queries[0]
|
| |
+ self.assertEqual(query.tables, ['user_perms'])
|
| |
+ self.assertEqual(query.joins, ['permissions ON perm_id = permissions.id'])
|
| |
+ self.assertEqual(query.clauses, ['active = TRUE', 'user_id=%(user_id)s'])
|
| |
+ self.assertEqual(query.columns, ['name'])
|
| |
Fixes: https://pagure.io/koji/issue/3559