#47 Better performing watchdog (v2)
Merged 3 years ago by frantisekz. Opened 3 years ago by frantisekz.

@@ -0,0 +1,41 @@ 

+ """WatchDog v2

+ 

+ Revision ID: 2ac7e20e2b30

+ Revises: f0b018ebc214

+ Create Date: 2020-07-01 11:00:01.182375

+ 

+ """

+ from alembic import op

+ import sqlalchemy as sa

+ 

+ 

+ # revision identifiers, used by Alembic.

+ revision = '2ac7e20e2b30'

+ down_revision = '02a534c1cf42'

+ branch_labels = None

+ depends_on = None

+ 

+ 

+ def upgrade():

+     # ### commands auto generated by Alembic - please adjust! ###

+     op.drop_index(op.f('ix_watch_dog_data_kind'), table_name='watch_dog_data')

+     op.drop_index('idx_kind', table_name='watch_dog_data')

+     op.drop_table('watch_dog_data')

+     op.create_table('watch_dog_data',

+     sa.Column('id', sa.Integer(), nullable=False),

+     sa.Column('what', sa.Text(), nullable=True),

+     sa.Column('planned_at', sa.DateTime(), nullable=True),

+     sa.Column('started_at', sa.DateTime(), nullable=True),

+     sa.Column('ended_at', sa.DateTime(), nullable=True),

+     sa.Column('failed', sa.Boolean(), nullable=True),

+     sa.Column('message', sa.Text(), nullable=True),

+     sa.PrimaryKeyConstraint('id')

+     )

+     # ### end Alembic commands ###

+ 

+ 

+ def downgrade():

+     # ### commands auto generated by Alembic - please adjust! ###

+     op.drop_table('watch_dog_start_point')

+     op.drop_table('watch_dog_data')

+     # ### end Alembic commands ###

file modified
+12 -12
@@ -26,24 +26,24 @@ 

  

  class WatchDogData(db.Model):

      id = db.Column(db.Integer, primary_key=True)

-     kind = db.Column(db.String(16), unique=False, index=True)

-     what = db.Column(db.Text, unique=False)

-     when = db.Column(db.DateTime, unique=False)

-     message = db.Column(db.Text, unique=False)

+     what = db.Column(db.Text)

+     planned_at = db.Column(db.DateTime)

+     started_at = db.Column(db.DateTime)

+     ended_at = db.Column(db.DateTime)

+     failed = db.Column(db.Boolean)

+     message = db.Column(db.Text)

  

-     __table_args__ = (

-         db.Index('idx_kind', 'kind', postgresql_ops={'kind': 'text_pattern_ops'},),

-     )

- 

-     def __init__(self, kind, what, message):

-         self.kind = kind

+     def __init__(self, what, planned_at=None, started_at=None, ended_at=None, failed=None, message=None):

          self.what = what

-         self.when = datetime.utcnow()

+         self.planned_at = planned_at

+         self.started_at = started_at

+         self.ended_at = ended_at

+         self.failed = failed

          self.message = message

  

  class WatchDogStartPoint(db.Model):

      id = db.Column(db.Integer, primary_key=True)

-     last_seen_id = db.Column(db.Integer, unique=False)

+     last_seen_id = db.Column(db.Integer)

  

      def __init__(self, last_seen_id):

          self.last_seen_id = last_seen_id

@@ -141,7 +141,7 @@ 

              data = row.data

  

          # Store time of sync finish

-         #watchdog_utils.push_to_watchdog("sync_ended", what, None)

+         watchdog_utils.push_to_watchdog("sync_ended", what)

  

          if app.config['ENABLE_LOCAL_CACHE']:

              self._local_cache[what] = CachedObject(row.time_created, data)

@@ -47,7 +47,7 @@ 

      counter = redis_conn.incr(what)

      if counter == 1:

          app.logger.debug("PLANNING - celery_refresh on %s" % what)

-         #push_to_watchdog("sync_planned", what, None)

+         push_to_watchdog("sync_planned", what)

          celery_refresh.apply_async(args=args, kwargs=kwargs, priority=priority)

      else:

          app.logger.debug("celery_refresh already planned for %s " % what)
@@ -58,7 +58,7 @@ 

      app.logger.debug("START - celery_refresh on %s" % what)

      # run refresher

      try:

-         #push_to_watchdog("sync_started", what, None)

+         push_to_watchdog("sync_started", what)

          oraculum.CACHE._refresh(*args, **kwargs)

      except Exception as e:

          app.logger.error("ERROR - celery_refresh on %s\n%s" % (what, e))

@@ -23,39 +23,37 @@ 

  import re

  import datetime

  

+ from sqlalchemy import desc

+ 

  from oraculum.models.watchdog import WatchDogData, WatchDogStartPoint

  from oraculum.models.db_cache import CachedData

  from oraculum.utils.mailing_utils import send_mail

  

  from oraculum import app, db

  

- def push_to_watchdog(kind, what, message):

-     row = WatchDogData(kind, what, message)

-     db.session.add(row)

+ def push_to_watchdog(kind, what, message=None):

+     row = WatchDogData.query.filter_by(what=what).order_by(desc(WatchDogData.id)).first()

+     if not row:

+         row = WatchDogData(what)

+         db.session.add(row)

+ 

+     if kind == "sync_planned":

+         row.planned_at = datetime.datetime.utcnow()

+     elif kind == "sync_started":

+         row.started_at = datetime.datetime.utcnow()

+     elif kind == "sync_ended":

+         row.ended_at = datetime.datetime.utcnow()

+         row.failed = False

+     elif kind == "sync_failed":

+         row.ended_at = datetime.datetime.utcnow()

+         row.failed = True

+         row.message = message

      db.session.commit()

  

- def is_task_complete(started_task, ended_tasks):

-     """

-     Finds out if given started_task has ending counterpart in ended_tasks

-     """

-     for ended_task in ended_tasks:

-         if (started_task.what == ended_task.what) and (started_task.id < ended_task.id):

-             return True

-     return False

- 

  def process_queue():

      from oraculum.utils.celery_utils import get_users_for_sync

      from oraculum import CACHE

  

-     # Find out what db line id to start with

-     start_point = WatchDogStartPoint.query.first()

- 

-     if not start_point:

-         start_point = WatchDogStartPoint(0)

-         db.session.add(start_point)

-         db.session.commit()

-     start_point = start_point.last_seen_id

- 

      # Prepare users and packages who meet criteria for sync

      cached_users = get_users_for_sync()

      cached_packages = set()
@@ -69,7 +67,15 @@ 

      # Prepare data from db for watchdog emailing

      data_too_old = CachedData.query.filter(CachedData.time_created <= cache_cutoff).all()

  

-     # WatchDog Table

+     # Find out what db line id to start with

+     start_point = WatchDogStartPoint.query.first()

+ 

+     if not start_point:

+         start_point = WatchDogStartPoint(0)

+         db.session.add(start_point)

+         db.session.commit()

+     start_point = start_point.last_seen_id

+ 

      # Store last id we saw in the database to process it from there next time

      end_point = WatchDogData.query.order_by(WatchDogData.id.desc()).first()

      if not end_point:
@@ -77,16 +83,32 @@ 

          return

      end_point = end_point.id

  

-     sync_failures = WatchDogData.query.filter_by(kind="sync_failed").filter(WatchDogData.id > start_point).filter(WatchDogData.id <= end_point).all()

-     started_tasks = WatchDogData.query.filter_by(kind="sync_started").filter(WatchDogData.id > start_point).filter(WatchDogData.id <= end_point).all()

-     planned_tasks = WatchDogData.query.filter_by(kind="sync_planned").filter(WatchDogData.id > start_point).filter(WatchDogData.id <= end_point).all()

-     ended_tasks = WatchDogData.query.filter_by(kind="sync_ended").filter(WatchDogData.id > start_point).filter(WatchDogData.id <= end_point).all()

+     # WatchDog Table

+     sync_failures = WatchDogData.query.filter(WatchDogData.id > start_point).filter(WatchDogData.failed == True).all()

+ 

+     # NOTE: Do not change '==' and '!=' below as linter might suggest... it doesn't work

+     # started_rotting_tasks

+     q = WatchDogData.query.filter(WatchDogData.id > start_point)

+     q = q.filter(WatchDogData.id <= end_point)

+     q = q.filter(WatchDogData.ended_at == None)

+     q = q.filter(WatchDogData.started_at != None)

+     q = q.filter(WatchDogData.started_at <= datetime.datetime.utcnow() - datetime.timedelta(seconds=app.config["TASKS_ROT_AFTER"]))

+     started_rotting_tasks = q.all()

+ 

+     # planned_rotting_tasks

+     q = WatchDogData.query.filter(WatchDogData.id > start_point)

+     q = q.filter(WatchDogData.id <= end_point)

+     q = q.filter(WatchDogData.ended_at == None)

+     q = q.filter(WatchDogData.started_at == None)

+     q = q.filter(WatchDogData.planned_at <= datetime.datetime.utcnow() - datetime.timedelta(seconds=app.config["TASKS_ROT_AFTER"]))

+     planned_rotting_tasks = q.all()

+ 

  

      message = "<h1>Oraculum Failures Report</h1>\n"

      message += "<h3>Sync Failures</h3>"

      for sync_failure in sync_failures:

          message += "<b>What: </b>" + sync_failure.what + "<br />"

-         message += "<b>When: </b>" + str(sync_failure.when) + "<br />"

+         message += "<b>When: </b>" + str(sync_failure.ended_at) + "<br />"

          message += "<b>Message: </b><br />"

          message += sync_failure.message.replace("\n", "<br />")

          message += "<hr>"
@@ -107,26 +129,14 @@ 

          message += "<hr>"

  

      message += "<h3>Rotting Started Tasks</h3>"

-     for started_task in started_tasks:

-         # Filter out tasks that ended in some way (either fail or success, we are here just for the rotting ones)

-         if is_task_complete(started_task, ended_tasks) or is_task_complete(started_task, sync_failures):

-             continue

- 

-         # Find out tasks that were planned or started too long ago

-         if started_task.when + datetime.timedelta(seconds=app.config["TASKS_ROT_AFTER"]) <= datetime.datetime.utcnow():

-             message += "<b>What: </b>" + started_task.what + "<br />"

-             message += "<b>Start Time: </b>" + str(started_task.when) + "<br />"

+     for started_task in started_rotting_tasks:

+         message += "<b>What: </b>" + started_task.what + "<br />"

+         message += "<b>Start Time: </b>" + str(started_task.started_at) + "<br />"

  

      message += "<h3>Rotting Planned Tasks</h3>"

-     for planned_task in planned_tasks:

-         # Filter out tasks that ended in some way (either fail or success, we are here just for the rotting ones)

-         if is_task_complete(planned_task, ended_tasks) or is_task_complete(planned_task, sync_failures):

-             continue

- 

-         # Find out tasks that were planned or started too long ago

-         if planned_task.when + datetime.timedelta(seconds=app.config["TASKS_ROT_AFTER"]) <= datetime.datetime.utcnow():

-             message += "<b>What: </b>" + planned_task.what + "<br />"

-             message += "<b>Plan Time: </b>" + str(planned_task.when) + "<br />"

+     for planned_task in planned_rotting_tasks:

+         message += "<b>What: </b>" + planned_task.what + "<br />"

+         message += "<b>Plan Time: </b>" + str(planned_task.planned_at) + "<br />"

  

      try:

          if not app.config["SEND_ERROR_EMAILS"]:

no initial comment

1 new commit added

  • Fixup
3 years ago

1 new commit added

  • Migrations
3 years ago

The None at the end is not necessary

isn't the comit&load unnecessary, since the newly created row will be commited about 14 lines lower?

from a pure logic/"performance" perspective, these all could/should be elif

I'd rather see this named q then query, would be less cumbersome to read IMO and could not be confused with the SQLAlchemysquery`

I'm not sure what RAW SQL is created here, but my guesstimate is that doing this: (don't hold me to the bracket balance :))

query.filter(WatchDogData.started_at  <= datetime.datetime.utcnow() - datetime.timedelta(seconds=app.config["TASKS_ROT_AFTER"]))

will be significantly faster, since it can be done as "compare these two numbers" instead of "for each row, make a mathematical operation, and then compare it to this static number"

1 new commit added

  • Feedback
3 years ago

5 new commits added

  • Feedback
  • Migrations
  • Fixup
  • Tweaks
  • WIP
3 years ago

rebased onto ecf19e1

3 years ago

Pull-Request has been merged by frantisekz

3 years ago