#342 Add canceling waiting composes
Merged 4 years ago by lsedlar. Opened 4 years ago by lsedlar.
lsedlar/odcs cancel-compose  into  master

file modified
+25 -13
@@ -249,6 +249,30 @@ 

          raise ValueError('Unknown backend name {0}.'.format(backend))

  

  

+ def has_role(role):

+     """Check if current user has given role. With noauth all users have all

+     roles.

+ 

+     :returns: bool

+     """

+     if conf.auth_backend == 'noauth':

+         return True

+ 

+     groups = []

+     for group in getattr(conf, role).get('groups', []):

+         groups.append(group)

+ 

+     users = []

+     for user in getattr(conf, role).get('users', []):

+         users.append(user)

+ 

+     in_groups = bool(set(flask.g.groups) & set(groups))

+     in_users = flask.g.user.username in users

+     if in_groups or in_users:

+         return True

+     return False

+ 

+ 

  def requires_role(role):

      """Check if user is in the configured role.

  
@@ -261,21 +285,9 @@ 

      def wrapper(f):

          @wraps(f)

          def wrapped(*args, **kwargs):

-             if conf.auth_backend == 'noauth':

+             if has_role(role):

                  return f(*args, **kwargs)

  

-             groups = []

-             for group in getattr(conf, role).get('groups', []):

-                 groups.append(group)

- 

-             users = []

-             for user in getattr(conf, role).get('users', []):

-                 users.append(user)

- 

-             in_groups = bool(set(flask.g.groups) & set(groups))

-             in_users = flask.g.user.username in users

-             if in_groups or in_users:

-                 return f(*args, **kwargs)

              msg = "User %s is not in role %s." % (flask.g.user.username, role)

              log.error(msg)

              raise Forbidden(msg)

@@ -0,0 +1,26 @@ 

+ """Store celery task id

+ 

+ Revision ID: a855c39e2a0f

+ Revises: 82172e6a3154

+ Create Date: 2020-03-11 08:25:27.973866

+ 

+ """

+ 

+ # revision identifiers, used by Alembic.

+ revision = 'a855c39e2a0f'

+ down_revision = '82172e6a3154'

+ 

+ from alembic import op

+ import sqlalchemy as sa

+ 

+ 

+ def upgrade():

+     # ### commands auto generated by Alembic - please adjust! ###

+     op.add_column('composes', sa.Column('celery_task_id', sa.String(), nullable=True))

+     # ### end Alembic commands ###

+ 

+ 

+ def downgrade():

+     # ### commands auto generated by Alembic - please adjust! ###

+     op.drop_column('composes', 'celery_task_id')

+     # ### end Alembic commands ###

@@ -157,6 +157,8 @@ 

      pungi_compose_id = db.Column(db.String, nullable=True)

      # Full Pungi configuration dump, used only for raw_config source type.

      pungi_config_dump = db.Column(db.String, nullable=True)

+     # UUID of the celery task.

+     celery_task_id = db.Column(db.String, nullable=True)

  

      @classmethod

      def create(cls, session, owner, source_type, source, results,
@@ -227,6 +229,8 @@ 

              # Set pungi_compose_id to None, because it is regenerated once

              # this copied Compose is started.

              pungi_compose_id=None,

+             # Also reset celery task_id

+             celery_task_id=None,

          )

          session.add(compose)

          return compose

file modified
+64 -30
@@ -28,7 +28,7 @@ 

  from werkzeug.exceptions import BadRequest

  

  from odcs.server import app, db, log, conf, version

- from odcs.server.errors import NotFound

+ from odcs.server.errors import NotFound, Forbidden

  from odcs.server.models import Compose

  from odcs.common.types import (

      COMPOSE_RESULTS, COMPOSE_FLAGS, COMPOSE_STATES, PUNGI_SOURCE_TYPE_NAMES,
@@ -36,12 +36,12 @@ 

  from odcs.server.api_utils import (

      pagination_metadata, filter_composes, validate_json_data,

      raise_if_input_not_allowed)

- from odcs.server.auth import requires_role, login_required

+ from odcs.server.auth import requires_role, login_required, has_role

  from odcs.server.auth import require_scopes

  

  try:

      from odcs.server.celery_tasks import (

-         generate_pulp_compose, generate_pungi_compose)

+         celery_app, generate_pulp_compose, generate_pungi_compose)

      CELERY_AVAILABLE = True

  except ImportError:

      log.exception(
@@ -439,49 +439,83 @@ 

  

          if CELERY_AVAILABLE and conf.celery_broker_url:

              if source_type == PungiSourceType.PULP:

-                 generate_pulp_compose.delay(compose.id)

+                 result = generate_pulp_compose.delay(compose.id)

              else:

-                 generate_pungi_compose.delay(compose.id)

+                 result = generate_pungi_compose.delay(compose.id)

+ 

+             compose.celery_task_id = result.id

+             db.session.add(compose)

+             db.session.commit()

  

          return jsonify(compose.json()), 200

  

      @login_required

      @require_scopes('delete-compose')

-     @requires_role('admins')

      def delete(self, id):

-         """ Marks compose as expired to be removed later from ODCS storage.

-         The compose metadata are still stored in the ODCS database, only the

-         composed files stored in ODCS storage are removed.

+         """Cancels waiting compose or marks finished compose as expired to be

+         removed later from ODCS storage. The compose metadata are still stored

+         in the ODCS database, only the composed files stored in ODCS storage

+         are removed.

+ 

+         Users are allowed to cancel their own composes. Deleting is limited to

+         admins. Admins can also cancel any compose.

  

          :query number id: :ref:`ID<id>` of the compose to delete.

  

-         :statuscode 200: Compose updated and returned.

-         :statuscode 400: Compose is not in done or failed :ref:`state<state>`.

+         :statuscode 202: Compose updated and returned.

+         :statuscode 400: Compose is not in wait, done or failed :ref:`state<state>`.

          :statuscode 401: User is unathorized.

+         :statuscode 402: User doesn't own the compose to be cancelled or is not admin

          :statuscode 404: Compose not found.

          """

          compose = Compose.query.filter_by(id=id).first()

-         if compose:

-             # can remove compose that is in state of 'done' or 'failed'

-             deletable_states = {n: COMPOSE_STATES[n] for n in ['done', 'failed']}

-             if compose.state not in deletable_states.values():

-                 raise BadRequest('Compose (id=%s) can not be removed, its state need to be in %s.' %

-                                  (id, deletable_states.keys()))

- 

-             # change compose.time_to_expire to now, so backend will

-             # delete this compose as it's an expired compose now

-             compose.time_to_expire = datetime.datetime.utcnow()

-             compose.removed_by = g.user.username

-             db.session.add(compose)

-             db.session.commit()

-             message = ("The delete request for compose (id=%s) has been accepted and will be"

-                        " processed by backend later." % compose.id)

-             response = jsonify({'status': 202,

-                                 'message': message})

+         if not compose:

+             raise NotFound('No such compose found.')

+ 

+         is_admin = has_role("admins")

+ 

+         # First try to cancel the compose

+         if CELERY_AVAILABLE and compose.state == COMPOSE_STATES["wait"]:

+             if not is_admin and compose.owner != g.user.username:

+                 raise Forbidden(

+                     "Compose (id=%s) can not be canceled, it's owned by someone else."

+                 )

+ 

+             # Revoke the task

+             if compose.celery_task_id:

+                 celery_app.control.revoke(compose.celery_task_id)

+             # Change compose status to failed

+             compose.transition(

+                 COMPOSE_STATES["failed"], "Canceled by %s" % g.user.username

+             )

+             message = "Compose (id=%s) has been canceled" % id

+             response = jsonify({"status": 202, "message": message})

              response.status_code = 202

              return response

-         else:

-             raise NotFound('No such compose found.')

+ 

+         # Compose was not eligible for cancellation, try to delete it instead.

+         # Only admins can do this.

+         if not is_admin:

+             raise Forbidden("User %s is not in role admins." % g.user.username)

+ 

+         # can remove compose that is in state of 'done' or 'failed'

+         deletable_states = {n: COMPOSE_STATES[n] for n in ['done', 'failed']}

+         if compose.state not in deletable_states.values():

+             raise BadRequest('Compose (id=%s) can not be removed, its state need to be in %s.' %

+                              (id, deletable_states.keys()))

+ 

+         # change compose.time_to_expire to now, so backend will

+         # delete this compose as it's an expired compose now

+         compose.time_to_expire = datetime.datetime.utcnow()

+         compose.removed_by = g.user.username

+         db.session.add(compose)

+         db.session.commit()

+         message = ("The delete request for compose (id=%s) has been accepted and will be"

+                    " processed by backend later." % compose.id)

+         response = jsonify({'status': 202,

+                             'message': message})

+         response.status_code = 202

+         return response

  

  

  class AboutAPI(MethodView):

file modified
+1 -1
@@ -117,7 +117,7 @@ 

              if c.name in ["id", "state", "state_reason", "time_to_expire",

                            "time_done", "time_submitted", "time_removed",

                            "removed_by", "reused_id", "koji_task_id",

-                           "time_started", "pungi_compose_id"]:

+                           "time_started", "pungi_compose_id", "celery_task_id"]:

                  assertMethod = self.assertNotEqual

              else:

                  assertMethod = self.assertEqual

file modified
+60 -2
@@ -30,7 +30,7 @@ 

  

  from werkzeug.exceptions import BadRequest

  from freezegun import freeze_time

- from mock import patch, PropertyMock

+ from mock import patch, PropertyMock, call

  

  import odcs.server.auth

  
@@ -913,7 +913,7 @@ 

  

      def test_delete_not_allowed_states_compose(self):

          for state in COMPOSE_STATES.keys():

-             if state not in ['done', 'failed']:

+             if state not in ['wait', 'done', 'failed']:

                  new_c = Compose.create(

                      db.session, "unknown", PungiSourceType.MODULE, "testmodule:master",

                      COMPOSE_RESULTS["repository"], 60)
@@ -951,6 +951,9 @@ 

          self.assertEqual(data['error'], 'Not Found')

  

      def test_delete_compose_with_non_admin_user(self):

+         self.c1.state = COMPOSE_STATES["failed"]

+         db.session.commit()

+ 

          with self.test_request_context(user='dev'):

              flask.g.oidc_scopes = [

                  '{0}{1}'.format(conf.oidc_base_namespace, 'delete-compose')
@@ -1441,3 +1444,58 @@ 

          self.assertEqual(c.source, 'pungi_cfg#hash')

          self.assertEqual(c.label, 'Beta-1.2')

          self.assertEqual(c.compose_type, 'nightly')

+ 

+ 

+ class TestViewsCancelCompose(ViewBaseTest):

+     maxDiff = None

+ 

+     def setup_test_data(self):

+         self.initial_datetime = datetime(

+             year=2016, month=1, day=1, hour=0, minute=0, second=0

+         )

+         with freeze_time(self.initial_datetime):

+             self.c1 = Compose.create(

+                 db.session, "dev2", PungiSourceType.MODULE, "testmodule:master",

+                 COMPOSE_RESULTS["repository"], 60)

+             db.session.commit()

+             self.task_id = "71267f28-5194-4720-b57b-a665fabdb012"

+             self.c1.celery_task_id = self.task_id

+             db.session.commit()

+             self.c1_id = self.c1.id

+ 

+     @patch("odcs.server.views.CELERY_AVAILABLE", new=False)

+     def test_no_celery(self):

+         with self.test_request_context(user='dev2'):

+             resp = self.client.delete("/api/1/composes/%s" % self.c1.id)

+         # Without celery we can't cancel, so the code should try to delete the

+         # compose and fail on user not being an admin.

+         self.assertEqual(resp.status_code, 403)

+         data = json.loads(resp.get_data(as_text=True))

+         self.assertEqual(data["status"], 403)

+         self.assertEqual(data["message"], "User dev2 is not in role admins.")

+ 

+     def test_bad_owner(self):

+         with self.test_request_context(user='dev1'):

+             resp = self.client.delete("/api/1/composes/%s" % self.c1.id)

+         self.assertEqual(resp.status_code, 403)

+         data = json.loads(resp.get_data(as_text=True))

+         self.assertEqual(data["status"], 403)

+         self.assertEqual(

+             data["message"],

+             "Compose (id=%s) can not be canceled, it's owned by someone else.",

+         )

+ 

+     @patch("odcs.server.views.celery_app")

+     def test_cancel(self, app):

+         with self.test_request_context(user='dev2'):

+             resp = self.client.delete("/api/1/composes/%s" % self.c1_id)

+         self.assertEqual(resp.status_code, 202)

+         self.assertEqual(app.mock_calls, [call.control.revoke(self.task_id)])

+         data = json.loads(resp.get_data(as_text=True))

+         self.assertEqual(data["status"], 202)

+         self.assertEqual(

+             data["message"], "Compose (id=%s) has been canceled" % self.c1_id

+         )

+         c1 = db.session.query(Compose).filter(Compose.id == self.c1_id).first()

+         self.assertEqual(c1.state, COMPOSE_STATES["failed"])

+         self.assertEqual(c1.state_reason, "Canceled by dev2")

Store celery task ID for each compose

Extend the DELETE method handler to allow users to cancel their own waiting composes. Admins can cancel anything. Deleting data is still limited to admins only.

It is possible that while the revocation is happening a worker will take the compose task, in which case it will continue and finish. It's thus possible for compose to go wait -> failed -> done.

It looks good to me. I was thinking a bit about some way to check that compose is in "failed" state before possibly moving it to "done". But I think it does not hurt to keep this possible failed -> done move.

2 new commits added

  • Add canceling waiting composes
  • Store celery task ID for each compose
4 years ago

I tested this on dev server, and it seems to cancel fine. I'll add a state_reason to make it clear the compose was cancelled.

I also tried cancelling an old stale compose without celery task, and nothing happened, so I think it doesn't have to be handled.

2 new commits added

  • Add canceling waiting composes
  • Store celery task ID for each compose
4 years ago

I also changed the state change to use transition method. That should also take care of setting timestamp for expiration (not that there's anything to delete, but still).

2 new commits added

  • Add canceling waiting composes
  • Store celery task ID for each compose
4 years ago

Pull-Request has been merged by lsedlar

4 years ago