From 1436e8421247faf7ea7b8d8b371844b407b04347 Mon Sep 17 00:00:00 2001 From: František Zatloukal Date: Nov 15 2020 15:55:24 +0000 Subject: [PATCH 1/3] WIP: Store counters in the database instead of Redis --- diff --git a/alembic/versions/708ec1762eed_db_counters.py b/alembic/versions/708ec1762eed_db_counters.py new file mode 100644 index 0000000..7ebe87d --- /dev/null +++ b/alembic/versions/708ec1762eed_db_counters.py @@ -0,0 +1,34 @@ +"""DB counters + +Revision ID: 708ec1762eed +Revises: 2ac7e20e2b30 +Create Date: 2020-11-15 12:26:33.890125 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '708ec1762eed' +down_revision = '2ac7e20e2b30' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('counters', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('what', sa.Text(), nullable=True), + sa.Column('counter', sa.Integer(), nullable=True), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('what') + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('counters') + # ### end Alembic commands ### diff --git a/oraculum/models/counters.py b/oraculum/models/counters.py new file mode 100644 index 0000000..b19a47f --- /dev/null +++ b/oraculum/models/counters.py @@ -0,0 +1,33 @@ +# +# counters.py - Database model for Oraculum Counters +# +# Copyright 2020, Red Hat, Inc +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +# Authors: +# Frantisek Zatloukal + +from oraculum import db + + +class Counters(db.Model): + id = db.Column(db.Integer, primary_key=True) + what = db.Column(db.Text, unique=True) + counter = db.Column(db.Integer) + + def __init__(self, what, counter=0): + self.counter = counter + self.what = what diff --git a/oraculum/models/db_cache.py b/oraculum/models/db_cache.py index 6ca0e5a..4203c6d 100644 --- a/oraculum/models/db_cache.py +++ b/oraculum/models/db_cache.py @@ -1,5 +1,5 @@ # -# landing_page.py - Database model for landing page +# db_cache.py - Database model for Oraculum Cache Data # # Copyright 2020, Red Hat, Inc # diff --git a/oraculum/utils/celery_utils.py b/oraculum/utils/celery_utils.py index 79525d6..b4fcebf 100644 --- a/oraculum/utils/celery_utils.py +++ b/oraculum/utils/celery_utils.py @@ -21,14 +21,17 @@ # Frantisek Zatloukal # Josef Skladanka +import sqlalchemy.exc + import sys import datetime import traceback import oraculum -from oraculum import app, celery_app +from oraculum import app, celery_app, db from oraculum.action_providers import ACTION_PROVIDERS from oraculum.models.dashboard_users import DashboardUserData +from oraculum.models.counters import Counters from oraculum.utils.watchdog_utils import push_to_watchdog, process_queue @celery_app.task @@ -43,8 +46,21 @@ def plan_celery_refresh(priority, *args, **kwargs): what = oraculum.CACHE._construct_what(*args, **kwargs) - redis_conn = celery_app.broker_connection().default_channel.client - counter = redis_conn.incr(what) + try: + counter = Counters.query.with_for_update().filter_by(what=what).first() + except sqlalchemy.exc.OperationalError: + app.logger.debug("celery_refresh already planned for %s " % what) + return + + if counter: + counter.counter = counter.counter + 1 + else: + counter = (Counters(what, 1)) + db.session.add(counter) + db.session.commit() + + counter = counter.counter + if counter == 1: app.logger.debug("PLANNING - celery_refresh on %s" % what) push_to_watchdog("sync_planned", what) @@ -66,8 +82,9 @@ def celery_refresh(*args, **kwargs): #raise finally: # Cleanup counter to allow another scheduling - redis_conn = celery_app.broker_connection().default_channel.client - counter = redis_conn.getset(what, 0) + counter = Counters.query.with_for_update().filter_by(what=what).first() + counter.counter = 0 + db.session.commit() app.logger.debug("END - celery_refresh on %s" % what) diff --git a/oraculum/utils/watchdog_utils.py b/oraculum/utils/watchdog_utils.py index 5d4d695..dbc7b87 100644 --- a/oraculum/utils/watchdog_utils.py +++ b/oraculum/utils/watchdog_utils.py @@ -27,6 +27,7 @@ from sqlalchemy import desc from oraculum.models.watchdog import WatchDogData, WatchDogStartPoint from oraculum.models.db_cache import CachedData +from oraculum.models.counters import Counters from oraculum.utils.mailing_utils import send_mail from oraculum import app, celery_app, db @@ -55,9 +56,6 @@ def process_queue(): from oraculum.utils.celery_utils import get_users_for_sync from oraculum import CACHE - if app.config["ACTIVE_WATCHDOG"]: - redis_conn = celery_app.broker_connection().default_channel.client - # Prepare users and packages who meet criteria for sync cached_users = get_users_for_sync() cached_packages = set() @@ -149,7 +147,10 @@ def process_queue(): # Reset counter if ACTIVE_WATCHDOG if app.config["ACTIVE_WATCHDOG"]: - counter = redis_conn.getset(old_data.provider, 0) + counter = Counters.query.with_for_update().filter_by(what=old_data.provider).first() + if counter: + counter.counter = 0 + db.session.commit() try: if not app.config["SEND_ERROR_EMAILS"]: From b8be9d64a5497a118261b4f65175044355097a68 Mon Sep 17 00:00:00 2001 From: František Zatloukal Date: Nov 16 2020 08:21:49 +0000 Subject: [PATCH 2/3] More WatchDog guards for in-db counters --- diff --git a/oraculum/utils/watchdog_utils.py b/oraculum/utils/watchdog_utils.py index dbc7b87..18b78f2 100644 --- a/oraculum/utils/watchdog_utils.py +++ b/oraculum/utils/watchdog_utils.py @@ -22,6 +22,7 @@ import re import datetime +from typing import Counter from sqlalchemy import desc @@ -69,6 +70,15 @@ def process_queue(): # Prepare data from db for watchdog emailing data_too_old = CachedData.query.filter(CachedData.time_created <= cache_cutoff).all() + # Orphaned counters + if app.config["ACTIVE_WATCHDOG"]: + counters = Counters.query.filter(Counters.counter != 0).all() + for counter in counters: + cached_counterpart = CachedData.query.filter(CachedData.provider == counter.what).first() + if not cached_counterpart: + counter.counter = 0 + db.session.commit() + # Find out what db line id to start with start_point = WatchDogStartPoint.query.first() From 54663c35ef83a30e4502b041a5136aeb8ed81b0b Mon Sep 17 00:00:00 2001 From: František Zatloukal Date: Nov 16 2020 08:36:57 +0000 Subject: [PATCH 3/3] Remove now not needed try/except block --- diff --git a/oraculum/utils/celery_utils.py b/oraculum/utils/celery_utils.py index b4fcebf..a29baf5 100644 --- a/oraculum/utils/celery_utils.py +++ b/oraculum/utils/celery_utils.py @@ -21,8 +21,6 @@ # Frantisek Zatloukal # Josef Skladanka -import sqlalchemy.exc - import sys import datetime import traceback @@ -45,12 +43,7 @@ def plan_celery_refresh(priority, *args, **kwargs): priority = priorities.get(priority, priorities['medium']) what = oraculum.CACHE._construct_what(*args, **kwargs) - - try: - counter = Counters.query.with_for_update().filter_by(what=what).first() - except sqlalchemy.exc.OperationalError: - app.logger.debug("celery_refresh already planned for %s " % what) - return + counter = Counters.query.with_for_update().filter_by(what=what).first() if counter: counter.counter = counter.counter + 1