From bd02026b77a121750f3884a760257a73f6220ea4 Mon Sep 17 00:00:00 2001 From: Pierre-Yves Chibon Date: Feb 12 2018 10:38:47 +0000 Subject: Migrate the logcom service to be celery based and triggered This is another service that we can drop, simplifying both the architecture as well as maintenance. In addition, it will make python3 support easier since celery supports python3 while trollius doesn't and is even deprecated actually. Signed-off-by: Pierre-Yves Chibon --- diff --git a/pagure-logcom/README.rst b/pagure-logcom/README.rst deleted file mode 100644 index b60fb19..0000000 --- a/pagure-logcom/README.rst +++ /dev/null @@ -1,12 +0,0 @@ -Pagure LogCom -============= - -This is the service logging in the user's commits to be displayed in the -database. -This service is triggered by a git hook, sending a notification that a push -happened. This service receive the notification and goes over all the commit -that got pushed and logs the activity corresponding to that user. - - * Run:: - - PAGURE_CONFIG=/path/to/config PYTHONPATH=. python pagure-logcom/pagure_logcom_server.py diff --git a/pagure-logcom/pagure_logcom_server.py b/pagure-logcom/pagure_logcom_server.py deleted file mode 100644 index ce45ef6..0000000 --- a/pagure-logcom/pagure_logcom_server.py +++ /dev/null @@ -1,175 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" - (c) 2016-2017 - Copyright Red Hat Inc - - Authors: - Pierre-Yves Chibon - - -This server listens to message sent via redis post commits and log the -user's activity in the database. - -Using this mechanism, we no longer need to block the git push until all the -activity has been logged (which is you push the kernel tree for the first -time can be really time-consuming). - -""" - -from __future__ import print_function -import json -import logging -import os -from sqlalchemy.exc import SQLAlchemyError - -import trollius -import trollius_redis - -import pagure -import pagure.lib - - -if 'PAGURE_CONFIG' not in os.environ \ - and os.path.exists('/etc/pagure/pagure.cfg'): - print('Using configuration file `/etc/pagure/pagure.cfg`') - os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg' - -_config = pagure.config.config.reload_config() -_log = logging.getLogger(__name__) - - -@trollius.coroutine -def handle_messages(): - ''' Handles connecting to redis and acting upon messages received. - In this case, it means logging into the DB the commits specified in the - message for the default repo or sending commit notification emails. - - The currently accepted message format looks like: - - :: - - { - "project": { - "name": "foo", - "namespace": null, - "parent": null, - "username": { - "name": "user" - } - }, - "abspath": "/srv/git/repositories/pagure.git", - "commits": [ - "b7b4059c44d692d7df3227ce58ce01191e5407bd", - "f8d0899bb6654590ffdef66b539fd3b8cf873b35", - "9b6fdc48d3edab82d3de28953271ea52b0a96117" - ], - "branch": "master", - "default_branch": "master" - } - - ''' - - host = _config.get('REDIS_HOST', '0.0.0.0') - port = _config.get('REDIS_PORT', 6379) - dbname = _config.get('REDIS_DB', 0) - connection = yield trollius.From(trollius_redis.Connection.create( - host=host, port=port, db=dbname)) - - # Create subscriber. - subscriber = yield trollius.From(connection.start_subscribe()) - - # Subscribe to channel. - yield trollius.From(subscriber.subscribe(['pagure.logcom'])) - - # Inside a while loop, wait for incoming events. - while True: - reply = yield trollius.From(subscriber.next_published()) - _log.info( - 'Received: %s on channel: %s', - repr(reply.value), reply.channel) - data = json.loads(reply.value) - - commits = data['commits'] - abspath = data['abspath'] - branch = data['branch'] - default_branch = data['default_branch'] - repo = data['project']['name'] - username = data['project']['user']['name'] \ - if data['project']['parent'] else None - namespace = data['project']['namespace'] - - session = pagure.lib.create_session(_config['DB_URL']) - - _log.info('Looking for project: %s%s of %s', - '%s/' % namespace if namespace else '', - repo, username) - project = pagure.lib._get_project( - pagure.SESSION, repo, user=username, namespace=namespace, - case=_config.get('CASE_SENSITIVE', False)) - - if not project: - _log.info('No project found') - continue - - _log.info('Found project: %s', project.fullname) - - _log.info('Processing %s commits in %s', len(commits), abspath) - - # Only log commits when the branch is the default branch - if branch == default_branch: - pagure.lib.git.log_commits_to_db( - session, project, commits, abspath) - - # Notify subscribed users that there are new commits - pagure.lib.notify.notify_new_commits( - abspath, project, branch, commits) - - try: - session.commit() - except SQLAlchemyError as err: # pragma: no cover - session.rollback() - finally: - session.close() - _log.info('Ready for another') - - -def main(): - ''' Start the main async loop. ''' - - try: - loop = trollius.get_event_loop() - tasks = [ - trollius.async(handle_messages()), - ] - loop.run_until_complete(trollius.wait(tasks)) - loop.run_forever() - except KeyboardInterrupt: - pass - except trollius.ConnectionResetError: - pass - - _log.info("End Connection") - loop.close() - _log.info("End") - - -if __name__ == '__main__': - formatter = logging.Formatter( - "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s") - - logging.basicConfig(level=logging.DEBUG) - - # setup console logging - _log.setLevel(logging.DEBUG) - shellhandler = logging.StreamHandler() - shellhandler.setLevel(logging.DEBUG) - - aslog = logging.getLogger("asyncio") - aslog.setLevel(logging.DEBUG) - aslog = logging.getLogger("trollius") - aslog.setLevel(logging.DEBUG) - - shellhandler.setFormatter(formatter) - _log.addHandler(shellhandler) - main() diff --git a/pagure/hooks/files/default_hook.py b/pagure/hooks/files/default_hook.py old mode 100755 new mode 100644 index 868e4aa..5e5647a --- a/pagure/hooks/files/default_hook.py +++ b/pagure/hooks/files/default_hook.py @@ -5,7 +5,6 @@ """ from __future__ import print_function -import json import os import sys @@ -17,15 +16,13 @@ import pagure.exceptions # noqa: E402 import pagure.lib.link # noqa: E402 import pagure.lib.tasks # noqa: E402 -from pagure.lib import REDIS # noqa: E402 - if 'PAGURE_CONFIG' not in os.environ \ and os.path.exists('/etc/pagure/pagure.cfg'): os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg' -_config = pagure.config.config.reload_config() +_config = pagure.config.reload_config() abspath = os.path.abspath(os.environ['GIT_DIR']) @@ -39,8 +36,10 @@ def run_as_post_receive_hook(): print('user:', username) print('namespace:', namespace) + session = pagure.lib.create_session(_config['DB_URL']) + project = pagure.lib._get_project( - pagure.SESSION, repo, user=username, namespace=namespace, + session, repo, user=username, namespace=namespace, case=_config.get('CASE_SENSITIVE', False)) for line in sys.stdin: @@ -71,28 +70,21 @@ def run_as_post_receive_hook(): commits = pagure.lib.git.get_revs_between( oldrev, newrev, abspath, refname) - if REDIS: - if refname == default_branch: - print('Sending to redis to log activity and send commit ' - 'notification emails') - else: - print('Sending to redis to send commit notification emails') - # If REDIS is enabled, notify subscribed users that there are new - # commits to this project - REDIS.publish( - 'pagure.logcom', - json.dumps({ - 'project': project.to_json(public=True), - 'abspath': abspath, - 'branch': refname, - 'default_branch': default_branch, - 'commits': commits, - }) - ) + if refname == default_branch: + print('Sending to redis to log activity and send commit ' + 'notification emails') else: - print('Hook not configured to connect to pagure-logcom') - print('/!\ Commit notification emails will not be sent and ' - 'commits won\'t be logged') + print('Sending to redis to send commit notification emails') + + pagure.lib.tasks_services.log_commit_send_notifications.delay( + name=repo, + commits=commits, + abspath=abspath, + branch=refname, + default_branch=default_branch, + namespace=namespace, + username=username, + ) target_repo = project if project.is_fork: @@ -102,7 +94,7 @@ def run_as_post_receive_hook(): and target_repo.settings.get('pull_requests', True): print() prs = pagure.lib.search_pull_requests( - pagure.flask_app.SESSION, + session, project_id_from=project.id, status='Open', branch_from=refname, @@ -135,7 +127,7 @@ def run_as_post_receive_hook(): parent.user.user if parent.is_fork else None ) - pagure.SESSION.remove() + session.remove() def main(args): diff --git a/pagure/lib/tasks_services.py b/pagure/lib/tasks_services.py index df974ad..b79a2db 100644 --- a/pagure/lib/tasks_services.py +++ b/pagure/lib/tasks_services.py @@ -23,6 +23,7 @@ import six from celery import Celery from kitchen.text.converters import to_bytes +from sqlalchemy.exc import SQLAlchemyError import pagure.lib from pagure.config import config as pagure_config @@ -137,3 +138,59 @@ def webhook_notification( _log.info('Got the project and urls, going to the webhooks') call_web_hooks(project, topic, msg, urls) session.close() + + +@conn.task(queue=pagure_config.get('LOGCOM_CELERY_QUEUE', None), bind=True) +@set_status +def log_commit_send_notifications( + self, name, commits, abspath, branch, default_branch, + namespace=None, username=None): + """ Send webhook notifications about an event on that project. + + :arg topic: the topic for the notification + :type topic: str + :arg msg: the message to send via web-hook + :type msg: str + :kwarg namespace: the namespace of the project + :type namespace: None or str + :kwarg name: the name of the project + :type name: None or str + :kwarg user: the user of the project, only set if the project is a fork + :type user: None or str + + """ + session = pagure.lib.create_session(pagure_config['DB_URL']) + + _log.info( + 'Looking for project: %s%s of %s', + '%s/' % namespace if namespace else '', + name, + username) + project = pagure.lib._get_project( + session, name, user=username, namespace=namespace, + case=pagure_config.get('CASE_SENSITIVE', False)) + + if not project: + _log.info('No project found') + return + + _log.info('Found project: %s', project.fullname) + + _log.info('Processing %s commits in %s', len(commits), abspath) + + # Only log commits when the branch is the default branch + if branch == default_branch: + pagure.lib.git.log_commits_to_db( + session, project, commits, abspath) + + # Notify subscribed users that there are new commits + pagure.lib.notify.notify_new_commits( + abspath, project, branch, commits) + + try: + session.commit() + except SQLAlchemyError as err: # pragma: no cover + _log.exception(err) + session.rollback() + finally: + session.close()