From 7937d22e5e62221eddabe2ffb9c196ed50256c55 Mon Sep 17 00:00:00 2001 From: Pierre-Yves Chibon Date: Feb 07 2018 08:53:29 +0000 Subject: Move the webhook service to be a celery service Signed-off-by: Pierre-Yves Chibon --- diff --git a/pagure-webhook/pagure-webhook-server.py b/pagure-webhook/pagure-webhook-server.py deleted file mode 100644 index 42b245c..0000000 --- a/pagure-webhook/pagure-webhook-server.py +++ /dev/null @@ -1,191 +0,0 @@ -#!/usr/bin/env python - -""" - (c) 2015-2017 - Copyright Red Hat Inc - - Authors: - Pierre-Yves Chibon - - -This server listens to message sent via redis and send the corresponding -web-hook request. - -Using this mechanism, we no longer block the main application if the -receiving end is offline or so. - -""" - -from __future__ import print_function -import datetime -import hashlib -import hmac -import json -import logging -import os -import requests -import time -import uuid - -import six -import trollius -import trollius_redis - -from kitchen.text.converters import to_bytes - -import pagure -import pagure.lib -from pagure.exceptions import PagureEvException - - -if 'PAGURE_CONFIG' not in os.environ \ - and os.path.exists('/etc/pagure/pagure.cfg'): - print('Using configuration file `/etc/pagure/pagure.cfg`') - os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg' - -_config = pagure.config.config.reload_config() -log = logging.getLogger(__name__) -_i = 0 - - -def call_web_hooks(project, topic, msg, urls): - ''' Sends the web-hook notification. ''' - log.info( - "Processing project: %s - topic: %s", project.fullname, topic) - log.debug('msg: %s', msg) - - # Send web-hooks notification - global _i - _i += 1 - year = datetime.datetime.now().year - if isinstance(topic, six.text_type): - topic = to_bytes(topic, encoding='utf8', nonstring="passthru") - msg['pagure_instance'] = _config['APP_URL'] - msg['project_fullname'] = project.fullname - msg = dict( - topic=topic.decode('utf-8'), - msg=msg, - timestamp=int(time.time()), - msg_id=str(year) + '-' + str(uuid.uuid4()), - i=_i, - ) - - content = json.dumps(msg) - hashhex = hmac.new( - str(project.hook_token), content, hashlib.sha1).hexdigest() - hashhex256 = hmac.new( - str(project.hook_token), content, hashlib.sha256).hexdigest() - headers = { - 'X-Pagure': _config['APP_URL'], - 'X-Pagure-project': project.fullname, - 'X-Pagure-Signature': hashhex, - 'X-Pagure-Signature-256': hashhex256, - 'X-Pagure-Topic': topic, - 'Content-Type': 'application/json', - } - for url in urls: - url = url.strip() - log.info('Calling url %s' % url) - try: - req = requests.post( - url, - headers=headers, - data=content, - timeout=60, - ) - if not req: - log.info( - 'An error occured while querying: %s - ' - 'Error code: %s' % (url, req.status_code)) - except (requests.exceptions.RequestException, Exception) as err: - log.info( - 'An error occured while querying: %s - Error: %s' % ( - url, err)) - - -@trollius.coroutine -def handle_messages(): - host = _config.get('REDIS_HOST', '0.0.0.0') - port = _config.get('REDIS_PORT', 6379) - dbname = _config.get('REDIS_DB', 0) - connection = yield trollius.From(trollius_redis.Connection.create( - host=host, port=port, db=dbname)) - - # Create subscriber. - subscriber = yield trollius.From(connection.start_subscribe()) - - # Subscribe to channel. - yield trollius.From(subscriber.subscribe(['pagure.hook'])) - - # Inside a while loop, wait for incoming events. - while True: - reply = yield trollius.From(subscriber.next_published()) - log.info( - 'Received: %s on channel: %s', - repr(reply.value), reply.channel) - data = json.loads(reply.value) - username = None - if data['project'].startswith('forks'): - username, projectname = data['project'].split('/', 2)[1:] - else: - projectname = data['project'] - - namespace = None - if '/' in projectname: - namespace, projectname = projectname.split('/', 1) - - log.info( - 'Searching %s/%s/%s' % (username, namespace, projectname)) - session = pagure.lib.create_session(_config['DB_URL']) - project = pagure.lib._get_project( - session=session, name=projectname, user=username, - namespace=namespace, - case=_config.get('CASE_SENSITIVE', False)) - if not project: - log.info('No project found with these criteria') - session.close() - continue - urls = project.settings.get('Web-hooks') - session.close() - if not urls: - log.info('No URLs set: %s' % urls) - continue - urls = urls.split('\n') - log.info('Got the project, going to the webhooks') - call_web_hooks(project, data['topic'], data['msg'], urls) - - -def main(): - server = None - try: - loop = trollius.get_event_loop() - tasks = [ - trollius.async(handle_messages()), - ] - loop.run_until_complete(trollius.wait(tasks)) - loop.run_forever() - except KeyboardInterrupt: - pass - except trollius.ConnectionResetError: - pass - - log.info("End Connection") - loop.close() - log.info("End") - - -if __name__ == '__main__': - log = logging.getLogger("") - formatter = logging.Formatter( - "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s") - - # setup console logging - log.setLevel(logging.DEBUG) - ch = logging.StreamHandler() - ch.setLevel(logging.DEBUG) - - aslog = logging.getLogger("asyncio") - aslog.setLevel(logging.DEBUG) - - ch.setFormatter(formatter) - log.addHandler(ch) - main() diff --git a/pagure/lib/notify.py b/pagure/lib/notify.py index e744d14..e153976 100644 --- a/pagure/lib/notify.py +++ b/pagure/lib/notify.py @@ -16,7 +16,6 @@ from __future__ import print_function import datetime import hashlib -import json import logging import urlparse import re @@ -27,6 +26,7 @@ from email.mime.text import MIMEText import flask import pagure.lib +import pagure.lib.tasks_services from pagure.config import config as pagure_config @@ -68,13 +68,13 @@ def log(project, topic, msg, redis=None): fedmsg_publish(topic, msg) if redis and project and not project.private: - redis.publish( - 'pagure.hook', - json.dumps({ - 'project': project.fullname, - 'topic': topic, - 'msg': msg, - })) + pagure.lib.tasks_services.webhook_notification.delay( + topic=topic, + msg=msg, + namespace=project.namespace, + name=project.name, + user=project.user.username if project.is_fork else None, + ) def _add_mentioned_users(emails, comment): diff --git a/pagure/lib/tasks_services.py b/pagure/lib/tasks_services.py new file mode 100644 index 0000000..0098f87 --- /dev/null +++ b/pagure/lib/tasks_services.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- + +""" + (c) 2018 - Copyright Red Hat Inc + + Authors: + Pierre-Yves Chibon + +""" + +import datetime +import hashlib +import hmac +import json +import logging +import os +import os.path +import time +import uuid + + +import pygit2 +import requests +import six + +from celery import Celery +from kitchen.text.converters import to_bytes + +import pagure.lib +from pagure.config import config as pagure_config +from pagure.lib.tasks import set_status + +# logging.config.dictConfig(pagure_config.get('LOGGING') or {'version': 1}) +_log = logging.getLogger(__name__) +_i = 0 + + +if os.environ.get('PAGURE_BROKER_URL'): + broker_url = os.environ['PAGURE_BROKER_URL'] +elif pagure_config.get('BROKER_URL'): + broker_url = pagure_config['BROKER_URL'] +else: + broker_url = 'redis://%s' % pagure_config['REDIS_HOST'] + +conn = Celery('tasks', broker=broker_url, backend=broker_url) +conn.conf.update(pagure_config['CELERY_CONFIG']) + + +def call_web_hooks(project, topic, msg, urls): + ''' Sends the web-hook notification. ''' + _log.info( + "Processing project: %s - topic: %s", project.fullname, topic) + _log.debug('msg: %s', msg) + + # Send web-hooks notification + global _i + _i += 1 + year = datetime.datetime.utcnow().year + if isinstance(topic, six.text_type): + topic = to_bytes(topic, encoding='utf8', nonstring="passthru") + msg['pagure_instance'] = pagure_config['APP_URL'] + msg['project_fullname'] = project.fullname + msg = dict( + topic=topic.decode('utf-8'), + msg=msg, + timestamp=int(time.time()), + msg_id=str(year) + '-' + str(uuid.uuid4()), + i=_i, + ) + + content = json.dumps(msg) + hashhex = hmac.new( + str(project.hook_token), content, hashlib.sha1).hexdigest() + hashhex256 = hmac.new( + str(project.hook_token), content, hashlib.sha256).hexdigest() + headers = { + 'X-Pagure': pagure_config['APP_URL'], + 'X-Pagure-project': project.fullname, + 'X-Pagure-Signature': hashhex, + 'X-Pagure-Signature-256': hashhex256, + 'X-Pagure-Topic': topic, + 'Content-Type': 'application/json', + } + for url in urls: + url = url.strip() + _log.info('Calling url %s' % url) + try: + req = requests.post( + url, + headers=headers, + data={'payload': content}, + timeout=60, + ) + if not req: + _log.info( + 'An error occured while querying: %s - ' + 'Error code: %s' % (url, req.status_code)) + except (requests.exceptions.RequestException, Exception) as err: + _log.info( + 'An error occured while querying: %s - Error: %s' % ( + url, err)) + + +@conn.task(queue=pagure_config.get('WEBHOOK_CELERY_QUEUE', None), bind=True) +@set_status +def webhook_notification( + self, topic, msg, namespace=None, name=None, user=None): + """ Send webhook notifications about an event on that project. + + :arg topic: the topic for the notification + :type topic: str + :arg msg: the message to send via web-hook + :type msg: str + :kwarg namespace: the namespace of the project + :type namespace: None or str + :kwarg name: the name of the project + :type name: None or str + :kwarg user: the user of the project, only set if the project is a fork + :type user: None or str + + """ + session = pagure.lib.create_session(pagure_config['DB_URL']) + project = pagure.lib._get_project( + session, namespace=namespace, name=name, user=user, + case=pagure_config.get('CASE_SENSITIVE', False)) + + if not project: + session.close() + raise RuntimeError( + 'Project: %s/%s from user: %s not found in the DB' % ( + namespace, name, user)) + + urls = project.settings.get('Web-hooks') + if not urls: + _log.info('No URLs set: %s' % urls) + return + + urls = urls.split('\n') + _log.info('Got the project and urls, going to the webhooks') + call_web_hooks(project, topic, msg, urls) + session.close()