#46 Email Watchdog
Merged 3 years ago by frantisekz. Opened 3 years ago by frantisekz.

@@ -0,0 +1,45 @@ 

+ """WatchDog Table

+ 

+ Revision ID: 02a534c1cf42

+ Revises: f0b018ebc214

+ Create Date: 2020-06-30 15:26:58.904242

+ 

+ """

+ from alembic import op

+ import sqlalchemy as sa

+ 

+ 

+ # revision identifiers, used by Alembic.

+ revision = '02a534c1cf42'

+ down_revision = 'f0b018ebc214'

+ branch_labels = None

+ depends_on = None

+ 

+ 

+ def upgrade():

+     # ### commands auto generated by Alembic - please adjust! ###

+     op.create_table('watch_dog_data',

+     sa.Column('id', sa.Integer(), nullable=False),

+     sa.Column('kind', sa.String(length=16), nullable=True),

+     sa.Column('what', sa.Text(), nullable=True),

+     sa.Column('when', sa.DateTime(), nullable=True),

+     sa.Column('message', sa.Text(), nullable=True),

+     sa.PrimaryKeyConstraint('id')

+     )

+     op.create_index('idx_kind', 'watch_dog_data', ['kind'], unique=False, postgresql_ops={'kind': 'text_pattern_ops'})

+     op.create_index(op.f('ix_watch_dog_data_kind'), 'watch_dog_data', ['kind'], unique=False)

+     op.create_table('watch_dog_start_point',

+     sa.Column('id', sa.Integer(), nullable=False),

+     sa.Column('last_seen_id', sa.Integer(), nullable=True),

+     sa.PrimaryKeyConstraint('id')

+     )

+     # ### end Alembic commands ###

+ 

+ 

+ def downgrade():

+     # ### commands auto generated by Alembic - please adjust! ###

+     op.drop_table('watch_dog_start_point')

+     op.drop_index(op.f('ix_watch_dog_data_kind'), table_name='watch_dog_data')

+     op.drop_index('idx_kind', table_name='watch_dog_data')

+     op.drop_table('watch_dog_data')

+     # ### end Alembic commands ###

file modified
+15
@@ -61,3 +61,18 @@ 

  SYNC_INTERVALS["heart_beat"] = 10

  SYNC_INTERVALS["pagure_groups"] = 7200

  SYNC_INTERVALS["packager_dashboard_user_data"] = 7200

+ 

+ # Specify time (seconds) after which are tasks considered to be in "rotting state"

+ TASKS_ROT_AFTER = 7200

+ 

+ # FAILURE EMAILS

+ # To receive failres via email, you need to set up smtp connection details and set SEND_ERROR_EMAILS to True

+ SEND_ERROR_EMAILS = False

+ SMTP_SERVER = ""

+ SMTP_PORT = 465

+ SMTP_LOGIN = ""

+ SMTP_SENDER = ""

+ SMTP_PASSWORD = ""

+ 

+ # List of email addresses to send failures to

+ ADMIN_EMAILS = []

file modified
+15
@@ -89,6 +89,21 @@ 

      SYNC_INTERVALS["pagure_groups"] = 7200

      SYNC_INTERVALS["packager_dashboard_user_data"] = 7200

  

+     # Specify time (seconds) after which are tasks considered to be in "rotting state"

+     TASKS_ROT_AFTER = 7200

+ 

+     # FAILURE EMAILS

+     # To receive failres via email, you need to set up smtp connection details and set SEND_ERROR_EMAILS to True

+     SEND_ERROR_EMAILS = False

+     SMTP_SERVER = ""

+     SMTP_PORT = 465

+     SMTP_LOGIN = ""

+     SMTP_SENDER = ""

+     SMTP_PASSWORD = ""

+ 

+     # List of email addresses to send failures to

+     ADMIN_EMAILS = []

+ 

  

  class ProductionConfig(Config):

      PRODUCTION = True

@@ -0,0 +1,49 @@ 

+ #

+ # watchdog.py - Database model for Oraculum WatchDog

+ #

+ # Copyright 2020, Red Hat, Inc

+ #

+ # This program is free software; you can redistribute it and/or modify

+ # it under the terms of the GNU General Public License as published by

+ # the Free Software Foundation; either version 2 of the License, or

+ # (at your option) any later version.

+ #

+ # This program is distributed in the hope that it will be useful,

+ # but WITHOUT ANY WARRANTY; without even the implied warranty of

+ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the

+ # GNU General Public License for more details.

+ #

+ # You should have received a copy of the GNU General Public License along

+ # with this program; if not, write to the Free Software Foundation, Inc.,

+ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

+ #

+ # Authors:

+ #   Frantisek Zatloukal <fzatlouk@redhat.com>

+ 

+ from oraculum import db

+ 

+ from datetime import datetime

+ 

+ class WatchDogData(db.Model):

+     id = db.Column(db.Integer, primary_key=True)

+     kind = db.Column(db.String(16), unique=False, index=True)

+     what = db.Column(db.Text, unique=False)

+     when = db.Column(db.DateTime, unique=False)

+     message = db.Column(db.Text, unique=False)

+ 

+     __table_args__ = (

+         db.Index('idx_kind', 'kind', postgresql_ops={'kind': 'text_pattern_ops'},),

+     )

+ 

+     def __init__(self, kind, what, message):

+         self.kind = kind

+         self.what = what

+         self.when = datetime.utcnow()

+         self.message = message

+ 

+ class WatchDogStartPoint(db.Model):

+     id = db.Column(db.Integer, primary_key=True)

+     last_seen_id = db.Column(db.Integer, unique=False)

+ 

+     def __init__(self, last_seen_id):

+         self.last_seen_id = last_seen_id

@@ -26,7 +26,8 @@ 

  import oraculum

  from oraculum import app, celery_app, db

  from oraculum.models.db_cache import CachedData

- from oraculum.utils import celery_utils

+ from oraculum.models.watchdog import WatchDogData

+ from oraculum.utils import celery_utils, watchdog_utils

  

  from sqlalchemy import exc

  
@@ -139,6 +140,9 @@ 

              row = CachedData.query.filter_by(provider=what).first()

              data = row.data

  

+         # Store time of sync finish

+         watchdog_utils.push_to_watchdog("sync_ended", what, None)

+ 

          if app.config['ENABLE_LOCAL_CACHE']:

              self._local_cache[what] = CachedObject(row.time_created, data)

          return data

@@ -23,11 +23,13 @@ 

  

  import sys

  import datetime

+ import traceback

  

  import oraculum

  from oraculum import app, celery_app

  from oraculum.action_providers import ACTION_PROVIDERS

  from oraculum.models.dashboard_users import DashboardUserData

+ from oraculum.utils.watchdog_utils import push_to_watchdog

  

  @celery_app.task

  def plan_celery_refresh(priority, *args, **kwargs):
@@ -45,6 +47,7 @@ 

      counter = redis_conn.incr(what)

      if counter == 1:

          app.logger.debug("PLANNING - celery_refresh on %s" % what)

+         push_to_watchdog("sync_planned", what, None)

          celery_refresh.apply_async(args=args, kwargs=kwargs, priority=priority)

      else:

          app.logger.debug("celery_refresh already planned for %s " % what)
@@ -55,9 +58,11 @@ 

      app.logger.debug("START - celery_refresh on %s" % what)

      # run refresher

      try:

+         push_to_watchdog("sync_started", what, None)

          oraculum.CACHE._refresh(*args, **kwargs)

      except Exception as e:

          app.logger.error("ERROR - celery_refresh on %s\n%s" % (what, e))

+         push_to_watchdog("sync_failed", what, traceback.format_exc())

          #raise

      finally:

          # Cleanup counter to allow another scheduling

@@ -0,0 +1,43 @@ 

+ #

+ # mailing_utils.py - Functions for oraculum automated email sending

+ #

+ # Copyright 2020, Red Hat, Inc

+ #

+ # This program is free software; you can redistribute it and/or modify

+ # it under the terms of the GNU General Public License as published by

+ # the Free Software Foundation; either version 2 of the License, or

+ # (at your option) any later version.

+ #

+ # This program is distributed in the hope that it will be useful,

+ # but WITHOUT ANY WARRANTY; without even the implied warranty of

+ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the

+ # GNU General Public License for more details.

+ #

+ # You should have received a copy of the GNU General Public License along

+ # with this program; if not, write to the Free Software Foundation, Inc.,

+ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

+ #

+ # Authors:

+ #   Frantisek Zatloukal <fzatlouk@redhat.com>

+ 

+ import smtplib, ssl

+ import email.message

+ 

+ from oraculum import app

+ 

+ # Try to log in to server and send email

+ def send_mail(message):

+     msg = email.message.Message()

+     msg['Subject'] = 'Oraculum Failures'

+     msg.add_header('Content-Type','text/html')

+     msg.set_payload(message)

+     # Create a secure SSL context

+     context = ssl.create_default_context()

+     with smtplib.SMTP(app.config["SMTP_SERVER"], app.config["SMTP_PORT"]) as server:

+         server.ehlo()  # Can be omitted

+         server.starttls(context=context)

+         server.ehlo()  # Can be omitted

+         server.login(app.config["SMTP_LOGIN"], app.config["SMTP_PASSWORD"])

+         for admin_email in app.config["ADMIN_EMAILS"]:

+             server.sendmail(app.config["SMTP_SENDER"], admin_email, msg.as_string())

+         server.quit()

@@ -0,0 +1,140 @@ 

+ #

+ # watchdog_utils.py - Functions for oraculum WatchDog

+ #

+ # Copyright 2020, Red Hat, Inc

+ #

+ # This program is free software; you can redistribute it and/or modify

+ # it under the terms of the GNU General Public License as published by

+ # the Free Software Foundation; either version 2 of the License, or

+ # (at your option) any later version.

+ #

+ # This program is distributed in the hope that it will be useful,

+ # but WITHOUT ANY WARRANTY; without even the implied warranty of

+ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the

+ # GNU General Public License for more details.

+ #

+ # You should have received a copy of the GNU General Public License along

+ # with this program; if not, write to the Free Software Foundation, Inc.,

+ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

+ #

+ # Authors:

+ #   Frantisek Zatloukal <fzatlouk@redhat.com>

+ 

+ import re

+ import datetime

+ 

+ from oraculum.models.watchdog import WatchDogData, WatchDogStartPoint

+ from oraculum.models.db_cache import CachedData

+ from oraculum.utils.mailing_utils import send_mail

+ 

+ from oraculum import app, db

+ 

+ def push_to_watchdog(kind, what, message):

+     row = WatchDogData(kind, what, message)

+     db.session.add(row)

+     db.session.commit()

+ 

+ def is_task_complete(started_task, ended_tasks):

+     """

+     Finds out if given started_task has ending counterpart in ended_tasks

+     """

+     for ended_task in ended_tasks:

+         if (started_task.what == ended_task.what) and (started_task.id < ended_task.id):

+             return True

+     return False

+ 

+ def process_queue():

+     from oraculum.utils.celery_utils import get_users_for_sync

+     from oraculum import CACHE

+ 

+     # Find out what db line id to start with

+     start_point = WatchDogStartPoint.query.first()

+ 

+     if not start_point:

+         start_point = WatchDogStartPoint(0)

+         db.session.add(start_point)

+         db.session.commit()

+     start_point = start_point.last_seen_id

+ 

+     # Prepare users and packages who meet criteria for sync

+     cached_users = get_users_for_sync()

+     cached_packages = set()

+     for cached_user in cached_users:

+         packages = CACHE.get("packager-dashboard_user_data_static", cached_user)["packages"]

+         cached_packages.update(packages)

+ 

+     # Calculate time for "how old is too old"

+     cache_cutoff = datetime.datetime.utcnow() - datetime.timedelta(seconds=max(app.config["SYNC_INTERVALS"].values())) - datetime.timedelta(seconds=3600)

+ 

+     # Prepare data from db for watchdog emailing

+     data_too_old = CachedData.query.filter(CachedData.time_created <= cache_cutoff).all()

+ 

+     # WatchDog Table

+     # Store last id we saw in the database to process it from there next time

+     end_point = WatchDogData.query.order_by(WatchDogData.id.desc()).first()

+     if not end_point:

+         send_mail("No data in oraculum WatchDog")

+         return

+     end_point = end_point.id

+ 

+     sync_failures = WatchDogData.query.filter_by(kind="sync_failed").filter(WatchDogData.id > start_point).filter(WatchDogData.id <= end_point).all()

+     started_tasks = WatchDogData.query.filter_by(kind="sync_started").filter(WatchDogData.id > start_point).filter(WatchDogData.id <= end_point).all()

+     planned_tasks = WatchDogData.query.filter_by(kind="sync_planned").filter(WatchDogData.id > start_point).filter(WatchDogData.id <= end_point).all()

+     ended_tasks = WatchDogData.query.filter_by(kind="sync_ended").filter(WatchDogData.id > start_point).filter(WatchDogData.id <= end_point).all()

+ 

+     message = "<h1>Oraculum Failures Report</h1>\n"

+     message += "<h3>Sync Failures</h3>"

+     for sync_failure in sync_failures:

+         message += "<b>What: </b>" + sync_failure.what + "<br />"

+         message += "<b>When: </b>" + str(sync_failure.when) + "<br />"

+         message += "<b>Message: </b><br />"

+         message += sync_failure.message.replace("\n", "<br />")

+         message += "<hr>"

+ 

+     message += "<h3>Old data in DB</h3>"

+     for old_data in data_too_old:

+         # Filter data we don't cache (inactive users and their packages)

+         if "packager-dashboard_user_data_static" in old_data.provider:

+             # Regexp down gets args from "what"

+             # Eg. packager-dashboard_bugs [('fedora-easy-karma',), {}] > 'fedora-easy-karma'

+             if re.search('\[\(\'(.*)\',\)', old_data.provider).groups()[0] not in cached_users:

+                 continue

+         if "packager-dashboard_pull_requests" in old_data.provider or "packager-dashboard_bugs" in old_data.provider:

+             if re.search('\[\(\'(.*)\',\)', old_data.provider).groups()[0] not in cached_packages:

+                 continue

+         message += "<b>What: </b>" + old_data.provider + "<br />"

+         message += "<b>Last Sync: </b>" + str(old_data.time_created) + "<br />"

+         message += "<hr>"

+ 

+     message += "<h3>Rotting Started Tasks</h3>"

+     for started_task in started_tasks:

+         # Filter out tasks that ended in some way (either fail or success, we are here just for the rotting ones)

+         if is_task_complete(started_task, ended_tasks) or is_task_complete(started_task, sync_failures):

+             continue

+ 

+         # Find out tasks that were planned or started too long ago

+         if started_task.when + datetime.timedelta(seconds=app.config["TASKS_ROT_AFTER"]) <= datetime.datetime.utcnow():

+             message += "<b>What: </b>" + started_task.what + "<br />"

+             message += "<b>Start Time: </b>" + str(started_task.when) + "<br />"

+ 

+     message += "<h3>Rotting Planned Tasks</h3>"

+     for planned_task in planned_tasks:

+         # Filter out tasks that ended in some way (either fail or success, we are here just for the rotting ones)

+         if is_task_complete(planned_task, ended_tasks) or is_task_complete(planned_task, sync_failures):

+             continue

+ 

+         # Find out tasks that were planned or started too long ago

+         if planned_task.when + datetime.timedelta(seconds=app.config["TASKS_ROT_AFTER"]) <= datetime.datetime.utcnow():

+             message += "<b>What: </b>" + planned_task.what + "<br />"

+             message += "<b>Plan Time: </b>" + str(planned_task.when) + "<br />"

+ 

+     try:

+         if not app.config["SEND_ERROR_EMAILS"]:

+             raise

+         send_mail(message)

+         # Insert last processed id into the db so we can start from there next time

+         start_point = WatchDogStartPoint.query.first()

+         start_point.last_seen_id = end_point

+         db.session.commit()

+     except Exception:

+         app.logger.error("Watchdog: Mail report hasn't been sent due to mail error")

no initial comment

I'm not sure what the code below is supposed to do, but if the goal is removing a specific row from the database, then I'm fairly sure this is not the way of doing it :)

I'm guessing this will be executed periodically somehow?

rebased onto f37e948

3 years ago

I'm guessing this will be executed periodically somehow?

Yes, the idea is to execute this periodically. I am thinking that it might be better to expose this in the cli and handle periodic execution outside of celery (cron maybe?) for maximum reliability if there are issues with celery periodic scheduling.

2 new commits added

  • Add watchdog for old data in db
  • WIP: Watchdog
3 years ago

1 new commit added

  • Rotting tasks watchdog
3 years ago

1 new commit added

  • Small fixes
3 years ago

Either change this to something self-explanatory, or add a comment explaining what this infulences. Ideally both :)

1 new commit added

  • Whoopsie
3 years ago

maybe change the name to something more self-explanatory?

I'd rather see an empty list here

I'd honestly much rather see a separate table containing just this one value, than this. Not that the solution is not working, but it unnecessary obfuscation.

Might be worth changing this to db.String() with defined length, and adding an index on top, since the SQL queries used later in the code basically use this as the primary filter for searching. Also db.Text() seems unnecessary, when you really do control all the values.

I'm not much in favor of adding 'unnecessary' things to the try section, especially when (potentially) the other commit() can raise the IntegrityError but the except section does not account for it being there.
Are you sure we want to be storing the sync ended message only when the first commit() is successfull? If so, then please add a boolean variable to be used as flag, and run these three lines later on, if it was successful.
If we'd rather store the watchdog data every time the sync ends, then also move it out of the try-block, but just run it unconditionally

my gut feeling here is that this is way too inefective. But since this really only runs every couple of hours, I think we're fine.

1 new commit added

  • Feedback
3 years ago

I'd still add an index to the column.

You could ditch the id column, and only keep the last_seen_id, since you'll always have just one line in the db anyway.

Why did you decide not to use push_to_watchdog() here?

how about cached_packages.update(packages) instead of the for-cycle?

Thinking about this more, there is a strong possibility that this won't in fact, work properly, since the tasks are still running in the background, there is a strong possibility of this id of the 'last seen' being some random number not reflected by the selections above.
The same goes for the queries themselves. Since these are not atomic, you can (and probably will) get different slices in time/data in each one of these, causing "weird bugs" in the report.

I suggest two options
- load all the data from db with an ID larger than last_seen, and split it into the separate lists here, on backend, and save the last_id
- lock the table for writing, execute all the db queries, unlock the table

wouldn't it makes sense to also limit the data_too_old by the start_point somehow? IIUIC this will basically always read "all the data from DB that are older than X", and it will soon enough be quite a lot of data...

Would be nice to explain what the intent of the regexp is here :)

1 new commit added

  • .
3 years ago

7 new commits added

  • .
  • Feedback
  • Whoopsie
  • Small fixes
  • Rotting tasks watchdog
  • Add watchdog for old data in db
  • WIP: Watchdog
3 years ago

1 new commit added

  • Final touch
3 years ago

This most certainly is not a primary_key :D
I'd remove the unique and primary_key constraints, since they don't really make sense here IMO

wouldn't it makes sense to also limit the data_too_old by the start_point somehow? IIUIC this will basically always read "all the data from DB that are older than X", and it will soon enough be quite a lot of data...

Thinking about this more, there is a strong possibility that this won't in fact, work properly, since the tasks are still running in the background, there is a strong possibility of this id of the 'last seen' being some random number not reflected by the selections above.
The same goes for the queries themselves. Since these are not atomic, you can (and probably will) get different slices in time/data in each one of these, causing "weird bugs" in the report.

I suggest two options
- load all the data from db with an ID larger than last_seen, and split it into the separate lists here, on backend, and save the last_id
- lock the table for writing, execute all the db queries, unlock the table

Would be nice to explain what the intent of the regexp is - ideally with an example

mea culpa, I missed that this is working on top of a different table. Code looks good, but please restructure the code so it's not so easy to miss this detail :)

1 new commit added

  • Moar polish
3 years ago

rebased onto 8dde861

3 years ago

Pull-Request has been merged by frantisekz

3 years ago