From efeaa68374a647a91268fd6af3564d685d155376 Mon Sep 17 00:00:00 2001 From: Slavek Kabrda Date: Mar 16 2018 08:57:38 +0000 Subject: Make sure session is cleaned up even if task raises exception during execution --- diff --git a/pagure/lib/tasks.py b/pagure/lib/tasks.py index a5249b4..81bed6b 100644 --- a/pagure/lib/tasks.py +++ b/pagure/lib/tasks.py @@ -51,8 +51,10 @@ conn = Celery('tasks', broker=broker_url, backend=broker_url) conn.conf.update(pagure_config['CELERY_CONFIG']) -def set_status(function): - """ Simple decorator adjusting the status of the task when it starts. +def pagure_task(function): + """ Simple decorator that is responsible for: + * Adjusting the status of the task when it starts + * Creating and cleaning up a SQLAlchemy session """ @wraps(function) @@ -63,7 +65,12 @@ def set_status(function): self.update_state(state='RUNNING') except TypeError: pass - return function(self, *args, **kwargs) + session = pagure.lib.create_session(pagure_config['DB_URL']) + try: + return function(self, session, *args, **kwargs) + finally: + session.remove() + gc_clean() return decorated_function @@ -91,12 +98,14 @@ def gc_clean(): @conn.task(queue=pagure_config.get('GITOLITE_CELERY_QUEUE', None), bind=True) -@set_status +@pagure_task def generate_gitolite_acls( - self, namespace=None, name=None, user=None, group=None): + self, session, namespace=None, name=None, user=None, group=None): """ Generate the gitolite configuration file either entirely or for a specific project. + :arg session: SQLAlchemy session object + :type session: sqlalchemy.orm.session.Session :kwarg namespace: the namespace of the project :type namespace: None or str :kwarg name: the name of the project @@ -107,7 +116,6 @@ def generate_gitolite_acls( :type group: None or str """ - session = pagure.lib.create_session(pagure_config['DB_URL']) project = None if name and name != -1: project = pagure.lib._get_project( @@ -136,14 +144,12 @@ def generate_gitolite_acls( session.rollback() _log.exception( 'Failed to unmark read_only for: %s project', project) - session.remove() - gc_clean() @conn.task(queue=pagure_config.get('GITOLITE_CELERY_QUEUE', None), bind=True) -@set_status +@pagure_task def delete_project( - self, namespace=None, name=None, user=None, action_user=None): + self, session, namespace=None, name=None, user=None, action_user=None): """ Delete a project in pagure. This is achieved in three steps: @@ -151,6 +157,8 @@ def delete_project( - Remove the git repositories on disk - Remove the project from the DB + :arg session: SQLAlchemy session object + :type session: sqlalchemy.orm.session.Session :kwarg namespace: the namespace of the project :type namespace: None or str :kwarg name: the name of the project @@ -161,7 +169,6 @@ def delete_project( :type action_user: None or str """ - session = pagure.lib.create_session(pagure_config['DB_URL']) project = pagure.lib._get_project( session, namespace=namespace, name=name, user=user, case=pagure_config.get('CASE_SENSITIVE', False)) @@ -220,19 +227,18 @@ def delete_project( session.rollback() _log.exception( 'Failed to delete project: %s from the DB', project.fullname) - session.remove() - - gc_clean() return ret('ui_ns.view_user', username=username) @conn.task(bind=True) -@set_status -def create_project(self, username, namespace, name, add_readme, +@pagure_task +def create_project(self, session, username, namespace, name, add_readme, ignore_existing_repo): """ Create a project. + :arg session: SQLAlchemy session object + :type session: sqlalchemy.orm.session.Session :kwarg username: the user creating the project :type user: str :kwarg namespace: the namespace of the project @@ -247,8 +253,6 @@ def create_project(self, username, namespace, name, add_readme, :type ignore_existing_repo: bool """ - session = pagure.lib.create_session(pagure_config['DB_URL']) - project = pagure.lib._get_project( session, namespace=namespace, name=name, case=pagure_config.get('CASE_SENSITIVE', False)) @@ -352,20 +356,15 @@ def create_project(self, username, namespace, name, add_readme, user=project.user.user if project.is_fork else None) _log.info('Refreshing gitolite config queued in task: %s', task.id) - session.remove() - gc_clean() - return ret('ui_ns.view_repo', repo=name, namespace=namespace) @conn.task(bind=True) -@set_status -def update_git(self, name, namespace, user, ticketuid=None, requestuid=None): +@pagure_task +def update_git(self, session, name, namespace, user, ticketuid=None, requestuid=None): """ Update the JSON representation of either a ticket or a pull-request depending on the argument specified. """ - session = pagure.lib.create_session(pagure_config['DB_URL']) - project = pagure.lib._get_project( session, namespace=namespace, name=name, user=user, case=pagure_config.get('CASE_SENSITIVE', False)) @@ -385,19 +384,15 @@ def update_git(self, name, namespace, user, ticketuid=None, requestuid=None): result = pagure.lib.git._update_git(obj, project, folder) - session.remove() - gc_clean() return result @conn.task(bind=True) -@set_status -def clean_git(self, name, namespace, user, ticketuid): +@pagure_task +def clean_git(self, session, name, namespace, user, ticketuid): """ Remove the JSON representation of a ticket on the git repository for tickets. """ - session = pagure.lib.create_session(pagure_config['DB_URL']) - project = pagure.lib._get_project( session, namespace=namespace, name=name, user=user, case=pagure_config.get('CASE_SENSITIVE', False)) @@ -411,19 +406,16 @@ def clean_git(self, name, namespace, user, ticketuid): result = pagure.lib.git._clean_git(obj, project, folder) - session.remove() return result @conn.task(bind=True) -@set_status -def update_file_in_git(self, name, namespace, user, branch, branchto, +@pagure_task +def update_file_in_git(self, session, name, namespace, user, branch, branchto, filename, content, message, username, email, runhook=False): """ Update a file in the specified git repo. """ - session = pagure.lib.create_session(pagure_config['DB_URL']) - userobj = pagure.lib.search_user(session, username=username) project = pagure.lib._get_project( session, namespace=namespace, name=name, user=user, @@ -434,18 +426,15 @@ def update_file_in_git(self, name, namespace, user, branch, branchto, project, branch, branchto, filename, content, message, userobj, email, runhook=runhook) - session.remove() return ret('ui_ns.view_commits', repo=project.name, username=user, namespace=namespace, branchname=branchto) @conn.task(bind=True) -@set_status -def delete_branch(self, name, namespace, user, branchname): +@pagure_task +def delete_branch(self, session, name, namespace, user, branchname): """ Delete a branch from a git repo. """ - session = pagure.lib.create_session(pagure_config['DB_URL']) - project = pagure.lib._get_project( session, namespace=namespace, name=name, user=user, case=pagure_config.get('CASE_SENSITIVE', False)) @@ -460,14 +449,13 @@ def delete_branch(self, name, namespace, user, branchname): except pygit2.GitError as err: _log.exception(err) - session.remove() return ret( 'ui_ns.view_repo', repo=name, namespace=namespace, username=user) @conn.task(bind=True) -@set_status -def fork(self, name, namespace, user_owner, user_forker, editbranch, editfile): +@pagure_task +def fork(self, session, name, namespace, user_owner, user_forker, editbranch, editfile): """ Forks the specified project for the specified user. :arg namespace: the namespace of the project @@ -486,8 +474,6 @@ def fork(self, name, namespace, user_owner, user_forker, editbranch, editfile): :type editfile: str """ - session = pagure.lib.create_session(pagure_config['DB_URL']) - repo_from = pagure.lib._get_project( session, namespace=namespace, name=name, user=user_owner, case=pagure_config.get('CASE_SENSITIVE', False)) @@ -564,7 +550,6 @@ def fork(self, name, namespace, user_owner, user_forker, editbranch, editfile): ) del frepo - session.remove() _log.info('Project created, refreshing auth async') task = generate_gitolite_acls.delay( @@ -572,7 +557,6 @@ def fork(self, name, namespace, user_owner, user_forker, editbranch, editfile): name=repo_to.name, user=repo_to.user.user if repo_to.is_fork else None) _log.info('Refreshing gitolite config queued in task: %s', task.id) - gc_clean() if editfile is None: return ret('ui_ns.view_repo', repo=name, namespace=namespace, @@ -584,8 +568,8 @@ def fork(self, name, namespace, user_owner, user_forker, editbranch, editfile): @conn.task(bind=True) -@set_status -def pull_remote_repo(self, remote_git, branch_from): +@pagure_task +def pull_remote_repo(self, session, remote_git, branch_from): """ Clone a remote git repository locally for remote PRs. """ @@ -596,18 +580,15 @@ def pull_remote_repo(self, remote_git, branch_from): remote_git, clonepath, checkout_branch=branch_from) del repo - gc_clean() return clonepath @conn.task(bind=True) -@set_status -def refresh_remote_pr(self, name, namespace, user, requestid): +@pagure_task +def refresh_remote_pr(self, session, name, namespace, user, requestid): """ Refresh the local clone of a git repository used in a remote pull-request. """ - session = pagure.lib.create_session(pagure_config['DB_URL']) - project = pagure.lib._get_project( session, namespace=namespace, name=name, user=user, case=pagure_config.get('CASE_SENSITIVE', False)) @@ -625,38 +606,30 @@ def refresh_remote_pr(self, name, namespace, user, requestid): repo.pull(branch=request.branch_from, force=True) refresh_pr_cache.delay(name, namespace, user) - session.remove() del repo - gc_clean() return ret( 'ui_ns.request_pull', username=user, namespace=namespace, repo=name, requestid=requestid) @conn.task(bind=True) -@set_status -def refresh_pr_cache(self, name, namespace, user): +@pagure_task +def refresh_pr_cache(self, session, name, namespace, user): """ Refresh the merge status cached of pull-requests. """ - session = pagure.lib.create_session(pagure_config['DB_URL']) - project = pagure.lib._get_project( session, namespace=namespace, name=name, user=user, case=pagure_config.get('CASE_SENSITIVE', False)) pagure.lib.reset_status_pull_request(session, project) - session.remove() - gc_clean() - @conn.task(bind=True) -@set_status -def merge_pull_request(self, name, namespace, user, requestid, user_merger): +@pagure_task +def merge_pull_request(self, session, name, namespace, user, requestid, + user_merger): """ Merge pull-request. """ - session = pagure.lib.create_session(pagure_config['DB_URL']) - project = pagure.lib._get_project( session, namespace=namespace, name=name, user=user, case=pagure_config.get('CASE_SENSITIVE', False)) @@ -671,20 +644,16 @@ def merge_pull_request(self, name, namespace, user, requestid, user_merger): session, request, user_merger, pagure_config['REQUESTS_FOLDER']) refresh_pr_cache.delay(name, namespace, user) - session.remove() - gc_clean() return ret( 'ui_ns.view_repo', repo=name, username=user, namespace=namespace) @conn.task(bind=True) -@set_status -def add_file_to_git( - self, name, namespace, user, user_attacher, issueuid, filename): +@pagure_task +def add_file_to_git(self, session, name, namespace, user, user_attacher, + issueuid, filename): """ Add a file to the specified git repo. """ - session = pagure.lib.create_session(pagure_config['DB_URL']) - project = pagure.lib._get_project( session, namespace=namespace, name=name, user=user, case=pagure_config.get('CASE_SENSITIVE', False)) @@ -704,13 +673,10 @@ def add_file_to_git( user_attacher, filename) - session.remove() - gc_clean() - @conn.task(bind=True) -@set_status -def project_dowait(self, name, namespace, user): +@pagure_task +def project_dowait(self, session, name, namespace, user): """ This is a task used to test the locking systems. It should never be allowed to be called in production instances, since that @@ -718,8 +684,6 @@ def project_dowait(self, name, namespace, user): repeatedly. """ assert pagure_config.get('ALLOW_PROJECT_DOWAIT', False) - session = pagure.lib.create_session(pagure_config['DB_URL']) - project = pagure.lib._get_project( session, namespace=namespace, name=name, user=user, case=pagure_config.get('CASE_SENSITIVE', False)) @@ -727,21 +691,16 @@ def project_dowait(self, name, namespace, user): with project.lock('WORKER'): time.sleep(10) - session.remove() - gc_clean() - return ret( 'ui_ns.view_repo', repo=name, username=user, namespace=namespace) @conn.task(bind=True) -@set_status -def sync_pull_ref(self, name, namespace, user, requestid): +@pagure_task +def sync_pull_ref(self, session, name, namespace, user, requestid): """ Synchronize a pull/ reference from the content in the forked repo, allowing local checkout of the pull-request. """ - session = pagure.lib.create_session(pagure_config['DB_URL']) - project = pagure.lib._get_project( session, namespace=namespace, name=name, user=user, case=pagure_config.get('CASE_SENSITIVE', False)) @@ -765,13 +724,10 @@ def sync_pull_ref(self, name, namespace, user, requestid): repo_obj = pygit2.Repository(repopath) pagure.lib.git.update_pull_ref(request, repo_obj) - session.remove() - gc_clean() - @conn.task(bind=True) -@set_status -def update_checksums_file(self, folder, filenames): +@pagure_task +def update_checksums_file(self, session, folder, filenames): """ Update the checksums file in the release folder of the project. """ @@ -811,8 +767,8 @@ def update_checksums_file(self, folder, filenames): @conn.task(bind=True) -@set_status -def commits_author_stats(self, repopath): +@pagure_task +def commits_author_stats(self, session, repopath): """ Returns some statistics about commits made against the specified git repository. """ @@ -834,7 +790,6 @@ def commits_author_stats(self, repopath): author = commit.author.name stats[(author, email)] += 1 - session = pagure.lib.create_session(pagure_config['DB_URL']) for (name, email), val in stats.items(): # For each recorded user info, check if we know the e-mail address of # the user. @@ -845,7 +800,6 @@ def commits_author_stats(self, repopath): # merge them into one record. stats.pop((name, email)) stats[(user.fullname, user.default_email)] += val - session.close() # Generate a list of contributors ordered by how many commits they # authored. The list consists of tuples with number of commits and people @@ -869,8 +823,8 @@ def commits_author_stats(self, repopath): @conn.task(bind=True) -@set_status -def commits_history_stats(self, repopath): +@pagure_task +def commits_history_stats(self, session, repopath): """ Returns the evolution of the commits made against the specified git repository. """ diff --git a/pagure/lib/tasks_services.py b/pagure/lib/tasks_services.py index 6e73c63..5ef7a80 100644 --- a/pagure/lib/tasks_services.py +++ b/pagure/lib/tasks_services.py @@ -27,7 +27,7 @@ from sqlalchemy.exc import SQLAlchemyError import pagure.lib from pagure.config import config as pagure_config -from pagure.lib.tasks import set_status +from pagure.lib.tasks import pagure_task from pagure.mail_logging import format_callstack @@ -103,11 +103,13 @@ def call_web_hooks(project, topic, msg, urls): @conn.task(queue=pagure_config.get('WEBHOOK_CELERY_QUEUE', None), bind=True) -@set_status +@pagure_task def webhook_notification( - self, topic, msg, namespace=None, name=None, user=None): + self, session, topic, msg, namespace=None, name=None, user=None): """ Send webhook notifications about an event on that project. + :arg session: SQLAlchemy session object + :type session: sqlalchemy.orm.session.Session :arg topic: the topic for the notification :type topic: str :arg msg: the message to send via web-hook @@ -120,7 +122,6 @@ def webhook_notification( :type user: None or str """ - session = pagure.lib.create_session(pagure_config['DB_URL']) project = pagure.lib._get_project( session, namespace=namespace, name=name, user=user, case=pagure_config.get('CASE_SENSITIVE', False)) @@ -139,16 +140,17 @@ def webhook_notification( urls = urls.split('\n') _log.info('Got the project and urls, going to the webhooks') call_web_hooks(project, topic, msg, urls) - session.close() @conn.task(queue=pagure_config.get('LOGCOM_CELERY_QUEUE', None), bind=True) -@set_status +@pagure_task def log_commit_send_notifications( - self, name, commits, abspath, branch, default_branch, + self, session, name, commits, abspath, branch, default_branch, namespace=None, username=None): """ Send webhook notifications about an event on that project. + :arg session: SQLAlchemy session object + :type session: sqlalchemy.orm.session.Session :arg topic: the topic for the notification :type topic: str :arg msg: the message to send via web-hook @@ -161,8 +163,6 @@ def log_commit_send_notifications( :type user: None or str """ - session = pagure.lib.create_session(pagure_config['DB_URL']) - _log.info( 'Looking for project: %s%s of %s', '%s/' % namespace if namespace else '', @@ -194,8 +194,6 @@ def log_commit_send_notifications( except SQLAlchemyError as err: # pragma: no cover _log.exception(err) session.rollback() - finally: - session.close() def get_files_to_load(title, new_commits_list, abspath): @@ -225,9 +223,9 @@ def get_files_to_load(title, new_commits_list, abspath): @conn.task(queue=pagure_config.get('LOADJSON_CELERY_QUEUE', None), bind=True) -@set_status +@pagure_task def load_json_commits_to_db( - self, name, commits, abspath, data_type, agent, + self, session, name, commits, abspath, data_type, agent, namespace=None, username=None): ''' Loads into the database the specified commits that have been pushed to either the tickets or the pull-request repository. @@ -238,8 +236,6 @@ def load_json_commits_to_db( _log.info('LOADJSON: Invalid data_type retrieved: %s', data_type) return - session = pagure.lib.create_session(pagure_config['DB_URL']) - _log.info( 'LOADJSON: Looking for project: %s%s of user: %s', '%s/' % namespace if namespace else '', @@ -331,19 +327,15 @@ def load_json_commits_to_db( _log.exception('LOADJSON: Could not find user %s' % agent) except SQLAlchemyError as err: # pragma: no cover session.rollback() - finally: - session.close() _log.info('LOADJSON: Ready for another') @conn.task(queue=pagure_config.get('CI_CELERY_QUEUE', None), bind=True) -@set_status -def trigger_ci_build(self, pr_uid, pr_id, branch, ci_type): +@pagure_task +def trigger_ci_build(self, session, pr_uid, pr_id, branch, ci_type): ''' Triggers a new run of the CI system on the specified pull-request. ''' - session = pagure.lib.create_session(pagure_config['DB_URL']) - pagure.lib.plugins.get_plugin('Pagure CI') _log.info('Pagure-CI: Looking for PR: %s', pr_uid) @@ -403,5 +395,4 @@ def trigger_ci_build(self, pr_uid, pr_id, branch, ci_type): else: _log.warning('Pagure-CI:Un-supported CI type') - session.close() _log.info('Pagure-CI: Ready for another')