From af51847d8992e1e9f89a95c2d85df571d0ba79e0 Mon Sep 17 00:00:00 2001 From: Pierre-Yves Chibon Date: Mar 22 2017 08:57:29 +0000 Subject: Add support for admin API token to pagure-admin --- diff --git a/pagure/cli/admin.py b/pagure/cli/admin.py index b292e9f..a4280ee 100644 --- a/pagure/cli/admin.py +++ b/pagure/cli/admin.py @@ -10,6 +10,7 @@ from __future__ import print_function import argparse +import datetime import logging import os @@ -21,7 +22,7 @@ if 'PAGURE_CONFIG' not in os.environ \ import pagure.exceptions import pagure.lib import pagure.lib.git -from pagure import (SESSION, generate_user_key_files) +from pagure import (SESSION, APP, generate_user_key_files) _log = logging.getLogger(__name__) @@ -57,6 +58,50 @@ def parse_arguments(): help='Generate a new hook token for every project in this instance') parser_hook_token.set_defaults(func=do_generate_hook_token) + # Admin token actions + parser_admin_token = subparsers.add_parser( + 'admin-token', + help='Manages the admin tokens for this instance') + + subsubparsers = parser_admin_token.add_subparsers(title='actions') + + # List admin token + parser_admin_list_token = subsubparsers.add_parser( + 'list', help="List the API admin token") + parser_admin_list_token.add_argument( + '--user', + help="User to associate or associated with the token") + parser_admin_list_token.add_argument( + '--token', help="API token") + parser_admin_list_token.add_argument( + '--active', default=False, action='store_true', + help="Only list active API token") + parser_admin_list_token.add_argument( + '--expired', default=False, action='store_true', + help="Only list expired API token") + parser_admin_token.set_defaults(func=do_list_admin_token) + + # Info about admin token + parser_admin_info_token = subsubparsers.add_parser( + 'info', help="Provide some information about a specific API token") + parser_admin_info_token.add_argument( + 'token', help="API token") + parser_admin_info_token.set_defaults(func=do_info_admin_token) + + # Expire admin token + parser_admin_expire_token = subsubparsers.add_parser( + 'expire', help="Expire a specific API token") + parser_admin_expire_token.add_argument( + 'token', help="API token") + parser_admin_expire_token.set_defaults(func=do_expire_admin_token) + + # Create admin token + parser_admin_create_token = subsubparsers.add_parser( + 'create', help="Create a new API token") + parser_admin_create_token.add_argument( + 'user', help="User to associate with the token") + parser_admin_create_token.set_defaults(func=do_create_admin_token) + return parser.parse_args() @@ -67,7 +112,7 @@ def _ask_confirmation(): return action.lower() in ['y', 'yes'] -def do_generate_acl(): +def do_generate_acl(_): """ Regenerate the gitolite ACL file. """ cmd = pagure.lib.git._get_gitolite_command() if not cmd: @@ -80,7 +125,7 @@ def do_generate_acl(): print('Gitolite ACLs updated') -def do_refresh_ssh(): +def do_refresh_ssh(_): """ Regenerate the user key files. """ print('Do you want to re-generate all the ssh keys for every user in ' 'the database? (Depending on your instance this may take a while ' @@ -101,6 +146,86 @@ def do_generate_hook_token(): print('Hook token all re-generated') +def do_list_admin_token(args): + """ List the admin token. """ + _log.debug('user: %s', args.user) + _log.debug('token: %s', args.token) + _log.debug('active: %s', args.active) + _log.debug('expire: %s', args.expired) + + acls = APP.config['ADMIN_API_ACLS'] + tokens = pagure.lib.search_token( + SESSION, acls, active=args.active, expired=args.expired) + for token in tokens: + print('%s -- %s -- %s' % ( + token.id, token.user.user, token.expiration)) + if not tokens: + print('No admin tokens found') + + +def do_info_admin_token(args): + """ Print out information about the specified API token. """ + _log.debug('token: %s', args.token) + + acls = APP.config['ADMIN_API_ACLS'] + token = pagure.lib.search_token(SESSION, acls, token=args.token) + if not token: + raise pagure.exceptions.PagureException('No such admin token found') + + print('%s -- %s -- %s' % ( + token.id, token.user.user, token.expiration)) + print('ACLs:') + for acl in token.acls: + print(' - %s' % acl.name) + + +def do_expire_admin_token(args): + """ Expire a specific admin token. """ + _log.debug('token: %s', args.token) + + acls = APP.config['ADMIN_API_ACLS'] + token = pagure.lib.search_token(SESSION, acls, token=args.token) + if not token: + raise pagure.exceptions.PagureException('No such admin token found') + + print('%s -- %s -- %s' % ( + token.id, token.user.user, token.expiration)) + print('ACLs:') + for acl in token.acls: + print(' - %s' % acl.name) + + print('Do you really want to expire this API token?') + if _ask_confirmation(): + token.expiration = datetime.datetime.utcnow() + SESSION.add(token) + SESSION.commit() + print('Token expired') + + +def do_create_admin_token(args): + """ Create a new admin token. """ + _log.debug('user: %s', args.user) + # Validate user first + user_obj = pagure.lib.get_user(SESSION, args.user) + + acls_list = APP.config['ADMIN_API_ACLS'] + for idx, acl in enumerate(acls_list): + print('%s. %s' % (idx, acl)) + + print('Which ACLs do you want to associated with this token?') + acls = raw_input('(Conna separated list): ') + acls_idx = [int(acl.strip()) for acl in acls.split(',')] + acls = [acls_list[acl] for acl in acls_idx] + + print('ACLs selected:') + for idx, acl in enumerate(acls_idx): + print('%s. %s' % (acls_idx[idx], acls[idx])) + + print('Do you want to create this API token?') + if _ask_confirmation(): + print(pagure.lib.add_token_to_user(SESSION, None, acls, args.user)) + + def main(): """ Start of the application. """ @@ -114,7 +239,7 @@ def main(): # Act based on the arguments given return_code = 0 try: - args.func() + args.func(args) except KeyboardInterrupt: print("\nInterrupted by user.") return_code = 1 diff --git a/pagure/lib/__init__.py b/pagure/lib/__init__.py index f01dd5f..43ad605 100644 --- a/pagure/lib/__init__.py +++ b/pagure/lib/__init__.py @@ -4218,3 +4218,55 @@ def get_obj_access(session, project_obj, obj): ) return query.first() + + +def search_token( + session, acls, user=None, token=None, active=False, expired=False): + ''' Searches the API tokens corresponding to the criterias specified. + + :arg session: the session to use to connect to the database. + :arg acls: List of the ACL associated with these API tokens + :arg user: restrict the API tokens to this given user + :arg token: restrict the API tokens to this specified token (if it + exists) + ''' + query = session.query( + model.Token + ).filter( + model.Token.id == model.TokenAcl.token_id + ).filter( + model.TokenAcl.acl_id == model.ACL.id + ) + + if isinstance(acls, list): + query = query.filter( + model.ACL.name.in_(acls) + ) + else: + query = query.filter( + model.ACL.name == acls + ) + + if user: + query = query.filter( + model.Token.user_id == model.User.id + ).filter( + model.User.user == user + ) + + if active: + query = query.filter( + model.Token.expiration > datetime.datetime.utcnow() + ) + elif expired: + query = query.filter( + model.Token.expiration <= datetime.datetime.utcnow() + ) + + if token: + query = query.filter( + model.Token.id == token + ) + return query.first() + else: + return query.all()