#655 [frontend][cli][python] add CLI for permissions
Merged 5 years ago by msuchy. Opened 5 years ago by praiskup.
Unknown source permissions-cli  into  master

file modified
+122 -1
@@ -683,6 +683,41 @@

              module = self.client.module_proxy.build_from_url(ownername, projectname, args.url)

          print("Created module {0}".format(module.nsv))

  

+     def action_permissions_edit(self, args):

+         ownername, projectname = self.parse_name(args.project)

+         if not args.permissions:

+             raise argparse.ArgumentTypeError(

+                 "neither --builder nor --admin specified")

+         self.client.project_proxy.set_permissions(ownername, projectname,

+                 args.permissions)

+         print("success")

+ 

+     def action_permissions_list(self, args):

+         ownername, projectname = self.parse_name(args.project)

+         perms = self.client.project_proxy.get_permissions(ownername, projectname)

+         for user in perms['permissions']:

+             print(user + ":")

+             for role, value in perms['permissions'][user].items():

+                 print("  {0}: {1}".format(role, value))

+ 

+     def action_permissions_request(self, args):

+         if not args.permissions:

+             raise argparse.ArgumentTypeError(

+                 "neither --builder nor --admin specified")

+         ownername, projectname = self.parse_name(args.project)

+         request = {}

+         for role, value in args.permissions['your user'].items():

+             if value == 'nothing':

+                 request[role] = False

+             elif value == 'request':

+                 request[role] = True

+             else:

+                 raise argparse.ArgumentTypeError(

+                         "--{0} can be 'nothing' or 'request', "

+                         "not '{1}'".format(role, value))

+         self.client.project_proxy.request_permissions(ownername, projectname,

+                                                       request)

+         print("success")

  

  def setup_parser():

      """
@@ -1132,6 +1167,92 @@

      parser_build_module_mmd_source.add_argument("--yaml", help="Path to modulemd file in yaml format")

      parser_build_module.set_defaults(func="action_build_module")

  

+ 

+     #########################################################

+     ###                   Permissions actions             ###

+     #########################################################

+ 

+     def make_permission_action(role, default_permission, default_username=None):

+         class CustomAction(argparse.Action):

+             def __call__(self, parser, args, argument, option_string=None):

+                 permission = self.default_permission

+                 username = self.default_username

+                 if not username:

+                     # User required, user=permission allowed.

+                     if '=' in argument:

+                         splitted = argument.split('=')

+                         permission = splitted.pop()

+                         username = '='.join(splitted)

+                     else:

+                         username = argument

+                 else:

+                     # username predefined (myself)

+                     if argument:

+                         permission = argument

+                 if not args.permissions:

+                     args.permissions = {}

+                 if not username in args.permissions:

+                     args.permissions[username] = {}

+                 if role in args.permissions[username]:

+                     raise argparse.ArgumentError(

+                         self, "requested more than once for {0}".format(username))

+                 args.permissions[username][role] = permission

+ 

+         CustomAction.default_permission = default_permission

+         CustomAction.default_username = default_username

+         return CustomAction

+ 

+     parser_permissions_edit = subparsers.add_parser(

+             "edit-permissions",

+             help="Edit roles/permissions on a copr project")

+     parser_permissions_edit.add_argument("project",

+             metavar='PROJECT',

+             help="An existing copr project")

+     edit_help = """Set the '{0}' role for USERNAME in PROJECT, VALUE can be one

+             of 'approved|request|nothing' (default=approved)"""

+     parser_permissions_edit.add_argument("--admin",

+             metavar='USERNAME[=VALUE]',

+             dest='permissions',

+             action=make_permission_action('admin', 'approved'),

+             help=edit_help.format('admin'))

+     parser_permissions_edit.add_argument("--builder",

+             metavar='USERNAME[=VALUE]',

+             dest='permissions',

+             action=make_permission_action('builder', 'approved'),

+             help=edit_help.format('builder'))

+     parser_permissions_edit.set_defaults(func='action_permissions_edit')

+ 

+     # list

+     parser_permissions_list = subparsers.add_parser(

+             "list-permissions",

+             help="Print the copr project roles/permissions")

+     parser_permissions_list.add_argument("project",

+             metavar='PROJECT',

+             help="An existing copr project")

+     parser_permissions_list.set_defaults(func='action_permissions_list')

+ 

+     # request

+     parser_permissions_request = subparsers.add_parser(

+             "request-permissions",

+             help="Request (or reject) your role in the copr project")

+     parser_permissions_request.add_argument("project",

+             metavar='PROJECT',

+             help="An existing copr project")

+     request_help = """Request/cancel request/remove your '{0}' permissions in

+             PROJECT.  VALUE can be one of 'request|nothing', default=request

+             """

+     parser_permissions_request.add_argument(

+             "--admin", nargs='?',

+             action=make_permission_action('admin', 'request', 'your user'),

+             dest='permissions',

+             help=request_help.format('admin'))

+     parser_permissions_request.add_argument(

+             "--builder", nargs='?',

+             action=make_permission_action('builder', 'request', 'your user'),

+             dest='permissions',

+             help=request_help.format('builder'))

+     parser_permissions_request.set_defaults(func='action_permissions_request')

+ 

      return parser

  

  
@@ -1190,7 +1311,7 @@

          sys.stderr.write("\nError: {0}\n".format(e))

          sys.exit(1)

      except argparse.ArgumentTypeError as e:

-         sys.stderr.write("\nError: {0}".format(e))

+         sys.stderr.write("\nError: {0}\n".format(e))

          sys.exit(2)

      except CoprException as e:

          sys.stderr.write("\nError: {0}\n".format(e))

file modified
+93
@@ -571,3 +571,96 @@

          assert "Watching build" in stdout

          assert "Build(s) 1 failed" in stderr

          assert len(mock_time.sleep.call_args_list) == 3

+ 

+ @mock.patch('copr_cli.main.Commands.action_permissions_edit',

+             new_callable=MagicMock())

+ def test_edit_permissions_output(action):

+     def test_me(args, expected_output):

+         main.main(['edit-permissions'] + args)

+         args = action.call_args[0][0]

+         assert args.permissions == expected_output

+ 

+ 

+     with pytest.raises(SystemExit) as err:

+         main.main(['edit-permissions'])

+     assert err.value.code == 2

+     assert len(action.call_args_list) == 0

+ 

+     test_me(['some/project'], None)

+     test_me(

+         ['some/project', '--admin', 'a', '--admin', 'b'],

+         {'a': {'admin': 'approved'},

+          'b': {'admin': 'approved'}}

+     )

+     test_me(

+         ['some/project', '--admin', 'praiskup=nothing', '--admin', 'b'],

+         {'b': {'admin': 'approved'},

+          'praiskup': {'admin': 'nothing'}}

+     )

+     test_me(

+         ['some/project', '--builder', 'praiskup', '--admin', 'praiskup'],

+         {'praiskup': {'admin': 'approved', 'builder': 'approved'}}

+     )

+ 

+ @mock.patch('copr_cli.main.Commands.action_permissions_edit',

+             new_callable=MagicMock())

+ @mock.patch('copr_cli.main.config_from_file', return_value=mock_config)

+ def test_edit_permissions_output(ocnfig, action):

+     def test_me(args, expected_output):

+         main.main(['edit-permissions'] + args)

+         args = action.call_args[0][0]

+         assert args.permissions == expected_output

+ 

+ 

+     with pytest.raises(SystemExit) as err:

+         main.main(['edit-permissions'])

+     assert err.value.code == 2

+     assert len(action.call_args_list) == 0

+ 

+     test_me(['some/project'], None)

+     test_me(

+         ['some/project', '--admin', 'a', '--admin', 'b'],

+         {'a': {'admin': 'approved'},

+          'b': {'admin': 'approved'}}

+     )

+     test_me(

+         ['some/project', '--admin', 'praiskup=nothing', '--admin', 'b'],

+         {'b': {'admin': 'approved'},

+          'praiskup': {'admin': 'nothing'}}

+     )

+     test_me(

+         ['some/project', '--builder', 'praiskup', '--admin', 'praiskup'],

+         {'praiskup': {'admin': 'approved', 'builder': 'approved'}}

+     )

+ 

+ @mock.patch('copr_cli.main.Commands.action_permissions_request',

+             new_callable=MagicMock())

+ @mock.patch('copr_cli.main.config_from_file', return_value=mock_config)

+ def test_edit_permissions_request(ocnfig, action):

+     def test_me(args, expected_output):

+         main.main(['request-permissions'] + args)

+         args = action.call_args[0][0]

+         assert args.permissions == expected_output

+ 

+     with pytest.raises(SystemExit) as err:

+         main.main(['request-permissions'])

+     assert err.value.code == 2

+     assert len(action.call_args_list) == 0

+ 

+     test_me(['some/project'], None)

+ 

+     with pytest.raises(SystemExit) as err:

+         test_me(['some/project', '--admin', '--admin'], None)

+     with pytest.raises(SystemExit) as err:

+         test_me(['some/project', '--admin', '--admin', 'b'], None)

+     assert err.value.code == 2

+ 

+     test_me(

+         ['some/project', '--admin', 'bad_status'],

+         {'your user': {'admin': 'bad_status'}} )

+     test_me(

+         ['some/project', '--admin', '--builder'],

+         {'your user': {'admin': 'request', 'builder': 'request'}})

+     test_me( # we don't parse '=' here

+         ['some/project', '--admin', 'bad_status=nothing'],

+         {'your user': {'admin': 'bad_status=nothing'}} )

@@ -63,7 +63,8 @@

  from coprs.views.api_ns import api_general

  from coprs.views import apiv3_ns

  from coprs.views.apiv3_ns import (apiv3_general, apiv3_builds, apiv3_packages, apiv3_projects, apiv3_project_chroots,

-                                   apiv3_modules, apiv3_build_chroots)

+                                   apiv3_modules, apiv3_build_chroots,

+                                   apiv3_permissions)

  from coprs.views import coprs_ns

  from coprs.views.coprs_ns import coprs_builds

  from coprs.views.coprs_ns import coprs_general

@@ -43,6 +43,7 @@

  

  

  class PermissionEnum(with_metaclass(EnumType, object)):

+     # The text form is part of APIv3!

      vals = {"nothing": 0, "request": 1, "approved": 2}

  

      @classmethod

@@ -15,7 +15,7 @@

  from coprs import exceptions

  from coprs import helpers

  from coprs import models

- from coprs.exceptions import MalformedArgumentException

+ from coprs.exceptions import MalformedArgumentException, BadRequest

  from coprs.logic import users_logic

  from coprs.whoosheers import CoprWhoosheer

  from coprs.helpers import fix_protocol_for_backend
@@ -449,6 +449,70 @@

      def delete(cls, copr_permission):

          db.session.delete(copr_permission)

  

+     @classmethod

+     def validate_permission(cls, user, copr, permission, state):

+         allowed = ['admin', 'builder']

+         if permission not in allowed:

+             raise BadRequest(

+                 "invalid permission '{0}', allowed {1}".format(permission,

+                     '|'.join(allowed)))

+ 

+         allowed = helpers.PermissionEnum.vals.keys()

+         if state not in allowed:

+             raise BadRequest(

+                 "invalid '{0}' permission state '{1}', "

+                 "use {2}".format(permission, state, '|'.join(allowed)))

+ 

+         if user.id == copr.user_id:

+             raise BadRequest("user '{0}' is owner of the '{1}' "

+                              "project".format(user.name, copr.full_name))

+ 

+     @classmethod

+     def set_permissions(cls, request_user, copr, user, permission, state):

+         users_logic.UsersLogic.raise_if_cant_update_copr(

+             request_user, copr,

+             "only owners and admins may update their projects permissions.")

+ 

+         cls.validate_permission(user, copr, permission, state)

+ 

+         perm_o = models.CoprPermission(user_id=user.id, copr_id=copr.id)

+         perm_o = db.session.merge(perm_o)

+         old_state = perm_o.get_permission(permission)

+ 

+         new_state = helpers.PermissionEnum(state)

+         perm_o.set_permission(permission, new_state)

+         db.session.merge(perm_o)

+ 

+         return (old_state, new_state) if old_state != new_state else None

+ 

+     @classmethod

+     def request_permission(cls, copr, user, permission, req_bool):

+         approved = helpers.PermissionEnum('approved')

+         state = None

+         if req_bool is True:

+             state = 'request'

+         elif req_bool is False:

+             state = 'nothing'

+         else:

+             raise BadRequest("invalid '{0}' permission request '{1}', "

+                              "expected True or False".format(permission,

+                                  req_bool))

+ 

+         cls.validate_permission(user, copr, permission, state)

+         perm_o = models.CoprPermission(user_id=user.id, copr_id=copr.id)

+         perm_o = db.session.merge(perm_o)

+         old_state = perm_o.get_permission(permission)

+         if old_state == approved and state == 'request':

+             raise BadRequest("You already are '{0}' in '{1}'".format(

+                                  permission, copr.full_name))

+ 

+         new_state = helpers.PermissionEnum(state)

+         perm_o.set_permission(permission, new_state)

+ 

+         if old_state != new_state:

+             return (old_state, new_state)

+         return None

+ 

  

  class CoprDirsLogic(object):

      @classmethod

@@ -9,6 +9,9 @@

      subject = None

      text = None

  

+     def __str__(self):

+         return self.subject + "\n\n" + self.text

+ 

  

  class PermissionRequestMessage(Message):

      def __init__(self, copr, applicant, permission_dict):
@@ -18,19 +21,24 @@

          :param models.CoprPermission permission: permission object

          :param dict permission_dict: {"old_builder": int, "old_admin": int, "new_builder": int, "new_admin": int}

          """

-         self.subject = "[Copr] {0}: {1} is asking permissions".format(copr.name, applicant.name)

-         self.text = ("{0} is asking for these permissions:\n\n"

-                      "Builder: {1} -> {2}\n"

-                      "Admin: {3} -> {4}\n\n"

-                      "Project: {5}\n"

-                      "Owner: {6}".format(

-                          applicant.name,

-                          helpers.PermissionEnum(permission_dict.get("old_builder", 0)),

-                          helpers.PermissionEnum(permission_dict.get("new_builder")),

-                          helpers.PermissionEnum(permission_dict.get("old_admin", 0)),

-                          helpers.PermissionEnum(permission_dict.get("new_admin")),

-                          copr.name,

-                          copr.owner_name))

+         self.subject = "[Copr] {0}: {1} is requesting permissions change".format(copr.full_name, applicant.name)

+ 

+         self.text = "{0} asked for these changes:\n\n".format(applicant.name)

+ 

+         for perm in ['Builder', 'Admin']:

+             old = permission_dict.get("old_"+perm.lower())

+             new = permission_dict.get("new_"+perm.lower())

+ 

+             if old != new:

+                 if old is None:

+                     old = 0 # previously unset

+                 self.text += "{0}: {1} -> {2}\n".format(

+                     perm,

+                     helpers.PermissionEnum(old),

+                     helpers.PermissionEnum(new),

+                 )

+ 

+         self.text += "\nProject: {0}".format(copr.full_name)

  

  

  class PermissionChangeMessage(Message):
@@ -39,18 +47,20 @@

          :param models.Copr copr:

          :param dict permission_dict: {"old_builder": int, "old_admin": int, "new_builder": int, "new_admin": int}

          """

-         self.subject = "[Copr] {0}: Your permissions have changed".format(copr.name)

-         self.text = (

-             "Your permissions have changed:\n\n"

-             "Builder: {0} -> {1}\n"

-             "Admin: {2} -> {3}\n\n"

-             "Project: {4}\n"

-             "Owner: {5}".format(

-                 helpers.PermissionEnum(permission_dict["old_builder"]),

-                 helpers.PermissionEnum(permission_dict["new_builder"]),

-                 helpers.PermissionEnum(permission_dict["old_admin"]),

-                 helpers.PermissionEnum(permission_dict["new_admin"]),

-                 copr.name, copr.user.name))

+         self.subject = "[Copr] {0}: Your permissions have changed".format(copr.full_name)

+         self.text = "Your permissions have changed:\n\n"

+ 

+         for perm in ['Builder', 'Admin']:

+             old = permission_dict.get("old_"+perm.lower())

+             new = permission_dict.get("new_"+perm.lower())

+             if old != new:

+                 if old is None:

+                     old = 0 # previously unset

+                 self.text += "{0}: {1} -> {2}\n".format(

+                     perm, helpers.PermissionEnum(old),

+                     helpers.PermissionEnum(new))

+ 

+         self.text += "\nProject: {0}".format(copr.full_name)

  

  

  class LegalFlagMessage(Message):

@@ -497,6 +497,13 @@

              return "will be deleted ASAP"

          return "will be deleted after {} days".format(self.delete_after_days)

  

+     @property

+     def admin_mails(self):

+         mails = [self.user.mail]

+         for perm in self.copr_permissions:

+             if perm.copr_admin == helpers.PermissionEnum('approved'):

+                 mails.append(perm.user.mail)

+         return mails

  

  class CoprPermission(db.Model, helpers.Serializer):

      """
@@ -515,6 +522,21 @@

      copr_id = db.Column(db.Integer, db.ForeignKey("copr.id"), primary_key=True)

      copr = db.relationship("Copr", backref=db.backref("copr_permissions"))

  

+     def set_permission(self, name, value):

+         if name == 'admin':

+             self.copr_admin = value

+         elif name == 'builder':

+             self.copr_builder = value

+         else:

+             raise KeyError("{0} is not a valid copr permission".format(name))

+ 

+     def get_permission(self, name):

+         if name == 'admin':

+             return 0 if self.copr_admin is None else self.copr_admin

+         if name == 'builder':

+             return 0 if self.copr_builder is None else self.copr_builder

+         raise KeyError("{0} is not a valid copr permission".format(name))

+ 

  

  class CoprDir(db.Model):

      """

@@ -6,7 +6,7 @@

  from functools import wraps

  from werkzeug.datastructures import ImmutableMultiDict

  from coprs import app

- from coprs.exceptions import CoprHttpException, ObjectNotFound

+ from coprs.exceptions import CoprHttpException, ObjectNotFound, AccessRestricted

  from coprs.logic.complex_logic import ComplexLogic

  

  
@@ -144,3 +144,18 @@

  

      def to_dict(self):

          return [x.to_dict() for x in self.get()]

+ 

+ def editable_copr(f):

+     @wraps(f)

+     def wrapper(ownername, projectname, **kwargs):

+         copr = get_copr(ownername, projectname)

+         if not flask.g.user.can_edit(copr):

+             raise AccessRestricted(

+                 "User '{0}' can not see permissions for project '{1}' "\

+                 "(missing admin rights)".format(

+                     flask.g.user.name,

+                     '/'.join([ownername, projectname])

+                 )

+             )

+         return f(copr, **kwargs)

+     return wrapper

@@ -0,0 +1,101 @@

+ import flask

+ 

+ from coprs.views.apiv3_ns import apiv3_ns

+ from coprs.views.misc import api_login_required

+ from coprs.exceptions import ObjectNotFound, BadRequest

+ from coprs.helpers import PermissionEnum

+ from coprs.logic.coprs_logic import CoprPermissionsLogic

+ from coprs.mail import send_mail, PermissionRequestMessage, PermissionChangeMessage

+ from coprs import db_session_scope, models

+ 

+ from . import GET, PUT, editable_copr, get_copr

+ 

+ 

+ @apiv3_ns.route("/project/permissions/get/<ownername>/<projectname>", methods=GET)

+ @api_login_required

+ @editable_copr

+ def get_permissions(copr):

+     if not copr.copr_permissions:

+         raise ObjectNotFound(

+             "No permissions set on {0} project".format(copr.full_name))

+ 

+     permissions = {}

+     for perm in copr.copr_permissions:

+         permissions[perm.user.name] = {

+             'admin': PermissionEnum(perm.copr_admin),

+             'builder': PermissionEnum(perm.copr_builder),

+         }

+ 

+     return flask.jsonify({'permissions': permissions})

+ 

+ 

+ @apiv3_ns.route("/project/permissions/set/<ownername>/<projectname>", methods=PUT)

+ @api_login_required

+ @editable_copr

+ def set_permissions(copr):

+     permissions = flask.request.get_json()

+     if not isinstance(permissions, dict):

+         raise BadRequest(

+             "request is not a dictionary, expected format: "

+             "{'username': {'admin': 'nothing', 'builder': 'request'} ...}")

+ 

+     if not permissions:

+         raise BadRequest("no permission change requested")

+ 

+     updated = {}

+     messages = []

+     with db_session_scope():

+         for username, perm_set in permissions.items():

+             user = models.User.query.filter_by(username=username).first()

+             if not user:

+                 raise BadRequest("user '{0}' doesn't exist in database".format(

+                     username))

+ 

+             permission_dict = {}

+             for perm, state in perm_set.items():

+                 change = CoprPermissionsLogic.set_permissions(

+                     flask.g.user, copr, user, perm, state)

+                 if change:

+                     updated[username] = True

+                     permission_dict['old_'+perm] = change[0]

+                     permission_dict['new_'+perm] = change[1]

+ 

+             if permission_dict:

+                 msg = PermissionChangeMessage(copr, permission_dict)

+                 messages.append({'address': user.mail, 'message': msg})

+ 

+     # send emails only if transaction succeeded

+     for task in messages:

+         if flask.current_app.config.get("SEND_EMAILS", False):

+             send_mail(task['address'], task['message'])

+ 

+     return flask.jsonify({'updated': list(updated.keys())})

+ 

+ 

+ @apiv3_ns.route("/project/permissions/request/<ownername>/<projectname>", methods=PUT)

+ @api_login_required

+ def request_permissions(ownername, projectname):

+     copr = get_copr(ownername, projectname)

+     roles = flask.request.get_json()

+     if not isinstance(roles, dict):

+         raise BadRequest("invalid 'roles' dict format, expected: "

+                          "{'admin': True, 'builder': False}")

+     if not roles:

+         raise BadRequest("no permission requested")

+ 

+     permission_dict = {}

+     with db_session_scope():

+         for permission, request_bool in roles.items():

+             change = CoprPermissionsLogic.request_permission(

+                 copr, flask.g.user, permission, request_bool)

+             if change:

+                 permission_dict['old_'+permission] = change[0]

+                 permission_dict['new_'+permission] = change[1]

+ 

+     if permission_dict:

+         msg = PermissionRequestMessage(copr, flask.g.user, permission_dict)

+         for address in copr.admin_mails:

+             if flask.current_app.config.get("SEND_EMAILS", False):

+                 send_mail(address, msg)

+ 

+     return flask.jsonify({'updated': bool(permission_dict)})

@@ -515,15 +515,10 @@

          flask.flash(

              "Successfully updated permissions for project '{0}'."

              .format(copr.name))

-         admin_mails = [copr.user.mail]

-         for perm in copr.copr_permissions:

-             # this 2 means that his status (admin) is approved

-             if perm.copr_admin == 2:

-                 admin_mails.append(perm.user.mail)

  

          # sending emails

          if flask.current_app.config.get("SEND_EMAILS", False):

-             for mail in admin_mails:

+             for mail in copr.admin_mails:

                  permission_dict = {"old_builder": old_builder, "old_admin": old_admin,

                                     "new_builder": new_builder, "new_admin": new_admin}

                  msg = PermissionRequestMessage(copr, flask.g.user, permission_dict)

@@ -395,6 +395,24 @@

              copr_builder=helpers.PermissionEnum("request"),

              copr_admin=helpers.PermissionEnum("approved"))

  

+ 

+     @pytest.fixture

+     def f_copr_more_permissions(self, f_copr_permissions):

+         self.u4 = models.User(

+             username=u"user4",

+             proven=False,

+             mail="baasdfz@bar.bar",

+             api_token='u4xxx',

+             api_login='u4login',

+             api_token_expiration=datetime.date.today() + datetime.timedelta(days=1000))

+ 

+         # only a builder

+         self.cp4 = models.CoprPermission(

+             copr=self.c3,

+             user=self.u4,

+             copr_builder=helpers.PermissionEnum("approved"),

+             copr_admin=helpers.PermissionEnum("nothing"))

+ 

          self.db.session.add_all([self.cp1, self.cp2, self.cp3])

  

      @pytest.fixture
@@ -482,11 +500,19 @@

              }

          )

  

+     def api3_auth_headers(self, user):

+         return {"Authorization": self._get_auth_string(user.api_login, user.api_token),

+                 "Content-Type": "application/json"}

+ 

      def post_api3_with_auth(self, url, content, user):

-         headers = {"Authorization": self._get_auth_string(user.api_login, user.api_token),

-                    "Content-Type": "application/json"}

+         headers = self.api3_auth_headers(user)

          return self.tc.post(url, data=json.dumps(content), headers=headers)

  

+     def get_api3_with_auth(self, url, user):

+         headers = self.api3_auth_headers(user)

+         print(headers)

+         return self.tc.get(url, headers=headers)

+ 

  

  class TransactionDecorator(object):

  

@@ -0,0 +1,226 @@

+ import json

+ from unittest.mock import patch, MagicMock

+ 

+ from coprs.models import User, Copr

+ 

+ from tests.coprs_test_case import CoprsTestCase

+ 

+ from coprs.views.apiv3_ns import apiv3_projects

+ 

+ 

+ class TestApiV3Permissions(CoprsTestCase):

+ 

+     def auth_get(self, path, user):

+         base = "/api_3/project/permissions"

+         return self.get_api3_with_auth(base + path, user)

+ 

+     def auth_post(self, path, data, user):

+         base = "/api_3/project/permissions"

+         return self.post_api3_with_auth(base + path, data, user)

+ 

+     def test_perms_get_require_admin(self, f_users, f_coprs, f_mock_chroots,

+                                      f_copr_more_permissions, f_users_api, f_db):

+ 

+         r = self.tc.get("/api_3/project/permissions/get/user2/foocopr")

+         assert r.status_code == 401

+ 

+         # user3 is "nothing:nothing" on c3

+         u = User.query.filter_by(username='user3').first()

+         r = self.auth_get("/get/user2/barcopr", u)

+         assert r.status_code == 403

+ 

+         # user4 is only a builder in c1

+         u = User.query.filter_by(username='user4').first()

+         r = self.auth_get("/get/user2/barcopr", u)

+         assert r.status_code == 403

+ 

+     def test_no_permission_set(self, f_users, f_coprs, f_mock_chroots,

+                                f_copr_permissions, f_users_api, f_db):

+         # u1 is authorized, but no permission is set on c1 copr

+         u1 = User.query.filter_by(username='user1').first()

+         r = self.get_api3_with_auth(

+             "/api_3/project/permissions/get/user1/foocopr",

+             u1)

+         assert r.status_code == 404

+         assert 'No permissions set on' in json.loads(r.data)['error']

+ 

+     def test_perms_accessible_by_user(self, f_users, f_coprs, f_mock_chroots,

+                                       f_copr_permissions, f_users_api, f_db):

+         # test owner

+         exp_data = {'permissions': {'user1': {'admin': 'nothing', 'builder': 'approved'}}}

+         u = User.query.filter_by(username='user2').first()

+         r = self.get_api3_with_auth(

+             "/api_3/project/permissions/get/user2/foocopr", u)

+         assert r.status_code == 200

+         assert json.loads(r.data) == exp_data

+ 

+         u = User.query.filter_by(username='user1').first()

+         r = self.get_api3_with_auth(

+             "/api_3/project/permissions/get/user2/foocopr", u)

+         assert r.status_code == 200

+         assert json.loads(r.data) == exp_data

+ 

+     def test_perms_set_require_admin(self, f_users, f_coprs, f_mock_chroots,

+                                      f_copr_more_permissions, f_users_api, f_db):

+ 

+         r = self.tc.post("/api_3/project/permissions/set/some/non-existent",

+                          data="something")

+         assert r.status_code == 401

+ 

+         # even authorized non-admin user isn't able to set the permissions

+         u3 = User.query.filter_by(username='user3').first()

+         r = self.post_api3_with_auth(

+             "/api_3/project/permissions/set/user2/foocopr", {}, u3)

+         assert r.status_code == 403

+ 

+         # even authorized non-admin user isn't able to set the permissions

+         u = User.query.filter_by(username='user4').first()

+         r = self.auth_post("/set/user2/barcopr", {}, u)

+         assert r.status_code == 403

+ 

+     def test_set_bad_data(self, f_users, f_coprs, f_mock_chroots,

+                           f_copr_permissions, f_users_api, f_db):

+         # test owner

+         u = User.query.filter_by(username='user2').first()

+         r = self.auth_post("/set/user2/barcopr", {}, u)

+         assert r.status_code == 400

+ 

+         r = self.auth_post("/set/user2/barcopr", {'non_existent': {'admin': 'approved'}}, u)

+         assert r.status_code == 400

+ 

+         r = self.auth_post("/set/user2/barcopr", {'user2': {'admin': 'approved'}}, u)

+         assert r.status_code == 400

+         print(r.data)

+ 

+     def test_settable(self, f_users, f_coprs, f_mock_chroots,

+                       f_copr_more_permissions, f_users_api, f_db):

+         # by owner

+         exp_data = {'permissions': {'user1': {'admin': 'nothing', 'builder': 'approved'}}}

+         u = User.query.filter_by(username='user2').first()

+         perms = {'user4': {'admin': 'approved'}}

+         r = self.auth_post("/set/user2/foocopr", perms, u)

+         assert r.status_code == 200

+         assert json.loads(r.data) == {'updated': ['user4']}

+ 

+         # by owner repeated

+         u = User.query.filter_by(username='user2').first()

+         r = self.auth_post("/set/user2/foocopr", perms, u)

+         assert r.status_code == 200

+         assert json.loads(r.data) == {'updated': []}

+ 

+         # test admin

+         u = User.query.filter_by(username='user1').first()

+         perms = {'user3': {'builder': 'approved'}}

+         r = self.auth_post("/set/user2/barcopr", perms, u)

+         assert r.status_code == 200

+         assert json.loads(r.data) == {'updated': ['user3']}

+ 

+     def test_cant_readd_owner(self, f_users, f_coprs, f_mock_chroots,

+                               f_copr_more_permissions, f_users_api, f_db):

+         u = User.query.filter_by(username='user1').first()

+         perms = {'user2': {'builder': 'approved'}}

+         r = self.auth_post("/set/user2/barcopr", perms, u)

+         assert r.status_code == 400

+         assert 'is owner of the' in json.loads(r.data)['error']

+ 

+     def test_request_invalid(self, f_users, f_coprs, f_mock_chroots,

+                              f_copr_more_permissions, f_users_api, f_db):

+         r = self.tc.get("/api_3/project/permissions/request/user2/foocopr")

+         assert r.status_code == 405

+         r = self.tc.post("/api_3/project/permissions/request/user2/foocopr",

+                          content_type="application/json",

+                          data={})

+         assert r.status_code == 401

+ 

+         u = User.query.filter_by(username='user1').first()

+         invalid_request = {'admin': 1}

+         r = self.auth_post("/request/user2/foocopr", invalid_request, u)

+         assert r.status_code == 400

+         assert "invalid 'admin' permission request" in json.loads(r.data)['error']

+ 

+         u = User.query.filter_by(username='user1').first()

+         invalid_request = {'admin_invalid': True}

+         r = self.auth_post("/request/user2/foocopr", invalid_request, u)

+         assert r.status_code == 400

+         assert "invalid permission 'admin_invalid'" in json.loads(r.data)['error']

+ 

+         # u1 is already admin

+         u = User.query.filter_by(username='user1').first()

+         r = self.auth_post("/request/user2/barcopr", {}, u)

+         assert r.status_code == 400

+         assert 'no permission' in json.loads(r.data)['error']

+ 

+         # u2 is owner

+         u = User.query.filter_by(username='user2').first()

+         r = self.auth_post("/request/user2/barcopr", {'admin': False}, u)

+         assert r.status_code == 400

+         assert 'is owner' in json.loads(r.data)['error']

+ 

+     def test_request_valid(self, f_users, f_coprs, f_mock_chroots,

+                            f_copr_more_permissions, f_users_api, f_db):

+         u = User.query.filter_by(username='user4').first()

+         permissions = {'admin': True}

+         r = self.auth_post("/request/user2/barcopr", permissions, u)

+         assert r.status_code == 200

+         assert json.loads(r.data)['updated'] == True

+ 

+         # re-request, but no update

+         u = User.query.filter_by(username='user4').first()

+         r = self.auth_post("/request/user2/barcopr", permissions, u)

+         assert r.status_code == 200

+         assert json.loads(r.data)['updated'] == False

+ 

+     @patch('coprs.views.apiv3_ns.apiv3_permissions.send_mail',

+            new_callable=MagicMock())

+     def test_perms_set_sends_emails(

+             self, send_mail, f_users, f_coprs, f_copr_more_permissions,

+             f_users_api, f_db):

+         self.app.config['SEND_EMAILS'] = True

+ 

+         u = User.query.filter_by(username='user1').first()

+         perms = {'user4': {'admin': 'approved'}}

+         r = self.auth_post("/set/user2/barcopr", perms, u)

+ 

+         msg = (

+             "[Copr] user2/barcopr: Your permissions have changed\n\n"

+             "Your permissions have changed:\n\n"

+             "Admin: nothing -> approved\n\n"

+             "Project: user2/barcopr")

+ 

+         assert len(send_mail.call_args_list) == 1

+         assert str(send_mail.call_args_list[0][0][1]) == msg

+ 

+     @patch('coprs.views.apiv3_ns.apiv3_permissions.send_mail',

+            new_callable=MagicMock())

+     def test_perms_set_sends_emails(

+             self, send_mail, f_users, f_coprs, f_copr_more_permissions,

+             f_users_api, f_db):

+         self.app.config['SEND_EMAILS'] = True

+ 

+         # u4 is only a builder in c3 so far, request admin!  but owner of c3 is

+         # u2, and one additional admin u1 (two messages)

+         u = User.query.filter_by(username='user4').first()

+         permissions = {'admin': True}

+         r = self.auth_post('/request/user2/barcopr', permissions, u)

+         assert r.status_code == 200

+ 

+         msg = "\n\n".join([

+             "[Copr] user2/barcopr: user4 is requesting permissions change",

+             "user4 asked for these changes:",

+             "Admin: nothing -> request",

+             "Project: user2/barcopr"])

+ 

+         emails = ['user1@spam.foo', 'user2@spam.foo']

+         calls = send_mail.call_args_list

+         assert len(calls) == 2

+         assert calls[0][0][0] == "user2@spam.foo"

+         assert calls[1][0][0] == "user1@foo.bar"

+         assert str(calls[0][0][1]) == msg

+         assert str(calls[1][0][1]) == msg

+ 

+         # re-request without errors, but no new mail!

+         u = User.query.filter_by(username='user4').first()

+         permissions = {'admin': True}

+         r = self.auth_post('/request/user2/barcopr', permissions, u)

+         assert r.status_code == 200

+         assert len(calls) == 2

@@ -8,21 +8,37 @@

  class TestMail(CoprsTestCase):

      def test_permissions_request_message(self, f_users, f_coprs, f_copr_permissions, f_db):

          msg = PermissionRequestMessage(self.c1, self.u2, {"new_builder": 1, "new_admin": 0})

-         assert msg.subject == "[Copr] foocopr: user2 is asking permissions"

-         assert msg.text == ("user2 is asking for these permissions:\n\n"

+         assert msg.subject == "[Copr] user1/foocopr: user2 is requesting permissions change"

+         assert msg.text == ("user2 asked for these changes:\n\n"

                              "Builder: nothing -> request\n"

                              "Admin: nothing -> nothing\n\n"

-                             "Project: foocopr\n"

-                             "Owner: user1")

+                             "Project: user1/foocopr")

+ 

+         msg = PermissionRequestMessage(self.c1, self.u2, {"old_admin": 1, "new_admin": 0})

+         assert msg.subject == "[Copr] user1/foocopr: user2 is requesting permissions change"

+         assert msg.text == ("user2 asked for these changes:\n\n"

+                             "Admin: request -> nothing\n\n"

+                             "Project: user1/foocopr")

  

      def test_permissions_change_message(self, f_users, f_coprs, f_copr_permissions, f_db):

          msg = PermissionChangeMessage(self.c1, {"old_builder": 0, "old_admin": 2, "new_builder": 2, "new_admin": 0})

-         assert msg.subject == "[Copr] foocopr: Your permissions have changed"

+         assert msg.subject == "[Copr] user1/foocopr: Your permissions have changed"

          assert msg.text == ("Your permissions have changed:\n\n"

                              "Builder: nothing -> approved\n"

                              "Admin: approved -> nothing\n\n"

-                             "Project: foocopr\n"

-                             "Owner: user1")

+                             "Project: user1/foocopr")

+ 

+         msg = PermissionChangeMessage(self.c1, {"old_builder": 0, "new_builder": 1})

+         assert msg.subject == "[Copr] user1/foocopr: Your permissions have changed"

+         assert msg.text == ("Your permissions have changed:\n\n"

+                             "Builder: nothing -> request\n\n"

+                             "Project: user1/foocopr")

+ 

+         msg = PermissionChangeMessage(self.c1, {"old_admin": 1, "new_admin": 0})

+         assert msg.subject == "[Copr] user1/foocopr: Your permissions have changed"

+         assert msg.text == ("Your permissions have changed:\n\n"

+                             "Admin: request -> nothing\n\n"

+                             "Project: user1/foocopr")

  

      def test_legal_flag_message(self, f_users, f_coprs, f_db):

          app.config["SERVER_NAME"] = "localhost"

file modified
+1 -1
@@ -6,7 +6,7 @@

  persistent=no

  

  extension-pkg-whitelist = SQLAlchemy

- ignored-modules = alembic*, sqlalchemy, SQLAlchemy

+ ignored-modules = alembic*, sqlalchemy*, SQLAlchemy*

  

  [MESSAGES CONTROL]

  

@@ -1,7 +1,7 @@

  from __future__ import absolute_import

  

  from . import BaseProxy

- from ..requests import Request, munchify, POST

+ from ..requests import Request, munchify, POST, GET, PUT

  

  

  class ProjectProxy(BaseProxy):
@@ -200,3 +200,94 @@

                            params=params, data=data, auth=self.auth)

          response = request.send()

          return munchify(response)

+ 

+     def get_permissions(self, ownername, projectname):

+         """

+         Get project permissions

+ 

+         :param str ownername: owner of the project

+         :param str projectname: name of the project

+         :return Munch: a dictionary in format

+             ``{username: {permission: state, ... }, ...}`` where ``username``

+             identifies an existing copr user, ``permission`` is one of

+             ``admin|builder`` and state is one of ``nothing|approved|request``.

+         """

+ 

+         endpoint = "/project/permissions/get/{ownername}/{projectname}/"

+         params = {

+             "ownername": ownername,

+             "projectname": projectname,

+         }

+         request = Request(

+                 endpoint,

+                 api_base_url=self.api_base_url,

+                 auth=self.auth,

+                 method=GET,

+                 params=params)

+ 

+         response = request.send()

+         return munchify(response)

+ 

+     def set_permissions(self, ownername, projectname, permissions):

+         """

+         Set (or change) permissions for a project

+ 

+         :param str ownername: owner of the updated project

+         :param str projectname: name of the updated project

+         :param dict permissions: the expected format is

+             ``{username: {permission: state, ...}, ...}``

+             where ``username`` identifies an existing copr user, ``permission``

+             is one of ``builder|admin`` and ``state`` value is one of

+             ``nothing|request|approved``.  It is OK to set only ``bulider`` or

+             only ``admin`` permission; any unspecified ``permission`` is then

+             (a) set to ``nothing`` (if the permission entry is newly created),

+             or (b) kept unchanged (if an existing permission entry is edited).

+             If more than one ``username`` is specified in single

+             ``set_permissions()`` request, the approach is to configure

+             *all-or-nothing* (any error makes the whole operation fail and

+             no-op).

+         """

+ 

+         endpoint = "/project/permissions/set/{ownername}/{projectname}/"

+         params = {

+             "ownername": ownername,

+             "projectname": projectname,

+         }

+         request = Request(

+                 endpoint,

+                 api_base_url=self.api_base_url,

+                 auth=self.auth,

+                 method=PUT,

+                 params=params,

+                 data=permissions)

+ 

+         request.send()

+ 

+     def request_permissions(self, ownername, projectname, permissions):

+         """

+         Request/cancel request/drop your permissions on project

+ 

+         :param str ownername: owner of the requested project

+         :param str projectname: name of the requested project

+         :param dict permissions: the desired permissions user wants to have on

+             the requested copr project.  The format is

+             ``{permission: bool, ...}``, where ``permission`` is one of

+             ``builder|admin`` and ``bool`` is

+             (a) ``True`` for *requesting* the role or

+             (b) ``False`` for *dropping* the role (or for *cancelling* of

+             previous request).

+         """

+         endpoint = "/project/permissions/request/{ownername}/{projectname}/"

+         params = {

+             "ownername": ownername,

+             "projectname": projectname,

+         }

+         request = Request(

+                 endpoint,

+                 api_base_url=self.api_base_url,

+                 auth=self.auth,

+                 method=PUT,

+                 params=params,

+                 data=permissions)

+ 

+         request.send()

@@ -11,6 +11,7 @@

  

  GET = "GET"

  POST = "POST"

+ PUT = "PUT"

  

  

  class Request(object):

Metadata Update from @praiskup:
- Pull-request tagged with: needs-work

5 years ago

Sometimes you use capital letter, sometimes not. It would be nice to stick to one choice.

Sometimes you use capital letter, sometimes not. It would be nice to stick to one choice.

I wanted to have parser.help capital and parser.argument lower to be precise (because I like the non-capitalized version more), and it is not mixed in a single --help output.... Though I agree that we should be consistent. So I flip to capital letter everywhere... (most of the old code seems to be written that way).

rebased onto 316973239116c6a97953e25c7edfd674efd47e0d

5 years ago

rebased onto bce8649a7db3b70d76dd1c7692f82f69a67389ac

5 years ago

rebased onto bce8649a7db3b70d76dd1c7692f82f69a67389ac

5 years ago

Metadata Update from @praiskup:
- Pull-request untagged with: needs-work

5 years ago

rebased onto ac4ddec

5 years ago

Pull-Request has been merged by msuchy

5 years ago