#2912 Move the webhook service to be a celery service
Merged 6 years ago by pingou. Opened 6 years ago by pingou.

@@ -1,191 +0,0 @@ 

- #!/usr/bin/env python

- 

- """

-  (c) 2015-2017 - Copyright Red Hat Inc

- 

-  Authors:

-    Pierre-Yves Chibon <pingou@pingoured.fr>

- 

- 

- This server listens to message sent via redis and send the corresponding

- web-hook request.

- 

- Using this mechanism, we no longer block the main application if the

- receiving end is offline or so.

- 

- """

- 

- from __future__ import print_function

- import datetime

- import hashlib

- import hmac

- import json

- import logging

- import os

- import requests

- import time

- import uuid

- 

- import six

- import trollius

- import trollius_redis

- 

- from kitchen.text.converters import to_bytes

- 

- import pagure

- import pagure.lib

- from pagure.exceptions import PagureEvException

- 

- 

- if 'PAGURE_CONFIG' not in os.environ \

-         and os.path.exists('/etc/pagure/pagure.cfg'):

-     print('Using configuration file `/etc/pagure/pagure.cfg`')

-     os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'

- 

- _config = pagure.config.config.reload_config()

- log = logging.getLogger(__name__)

- _i = 0

- 

- 

- def call_web_hooks(project, topic, msg, urls):

-     ''' Sends the web-hook notification. '''

-     log.info(

-         "Processing project: %s - topic: %s", project.fullname, topic)

-     log.debug('msg: %s', msg)

- 

-     # Send web-hooks notification

-     global _i

-     _i += 1

-     year = datetime.datetime.now().year

-     if isinstance(topic, six.text_type):

-         topic = to_bytes(topic, encoding='utf8', nonstring="passthru")

-     msg['pagure_instance'] = _config['APP_URL']

-     msg['project_fullname'] = project.fullname

-     msg = dict(

-         topic=topic.decode('utf-8'),

-         msg=msg,

-         timestamp=int(time.time()),

-         msg_id=str(year) + '-' + str(uuid.uuid4()),

-         i=_i,

-     )

- 

-     content = json.dumps(msg)

-     hashhex = hmac.new(

-         str(project.hook_token), content, hashlib.sha1).hexdigest()

-     hashhex256 = hmac.new(

-         str(project.hook_token), content, hashlib.sha256).hexdigest()

-     headers = {

-         'X-Pagure': _config['APP_URL'],

-         'X-Pagure-project': project.fullname,

-         'X-Pagure-Signature': hashhex,

-         'X-Pagure-Signature-256': hashhex256,

-         'X-Pagure-Topic': topic,

-         'Content-Type': 'application/json',

-     }

-     for url in urls:

-         url = url.strip()

-         log.info('Calling url %s' % url)

-         try:

-             req = requests.post(

-                 url,

-                 headers=headers,

-                 data=content,

-                 timeout=60,

-             )

-             if not req:

-                 log.info(

-                     'An error occured while querying: %s - '

-                     'Error code: %s' % (url, req.status_code))

-         except (requests.exceptions.RequestException, Exception) as err:

-             log.info(

-                 'An error occured while querying: %s - Error: %s' % (

-                     url, err))

- 

- 

- @trollius.coroutine

- def handle_messages():

-     host = _config.get('REDIS_HOST', '0.0.0.0')

-     port = _config.get('REDIS_PORT', 6379)

-     dbname = _config.get('REDIS_DB', 0)

-     connection = yield trollius.From(trollius_redis.Connection.create(

-         host=host, port=port, db=dbname))

- 

-     # Create subscriber.

-     subscriber = yield trollius.From(connection.start_subscribe())

- 

-     # Subscribe to channel.

-     yield trollius.From(subscriber.subscribe(['pagure.hook']))

- 

-     # Inside a while loop, wait for incoming events.

-     while True:

-         reply = yield trollius.From(subscriber.next_published())

-         log.info(

-             'Received: %s on channel: %s',

-             repr(reply.value), reply.channel)

-         data = json.loads(reply.value)

-         username = None

-         if data['project'].startswith('forks'):

-             username, projectname = data['project'].split('/', 2)[1:]

-         else:

-             projectname = data['project']

- 

-         namespace = None

-         if '/' in projectname:

-             namespace, projectname = projectname.split('/', 1)

- 

-         log.info(

-             'Searching %s/%s/%s' % (username, namespace, projectname))

-         session = pagure.lib.create_session(_config['DB_URL'])

-         project = pagure.lib._get_project(

-             session=session, name=projectname, user=username,

-             namespace=namespace,

-             case=_config.get('CASE_SENSITIVE', False))

-         if not project:

-             log.info('No project found with these criteria')

-             session.close()

-             continue

-         urls = project.settings.get('Web-hooks')

-         session.close()

-         if not urls:

-             log.info('No URLs set: %s' % urls)

-             continue

-         urls = urls.split('\n')

-         log.info('Got the project, going to the webhooks')

-         call_web_hooks(project, data['topic'], data['msg'], urls)

- 

- 

- def main():

-     server = None

-     try:

-         loop = trollius.get_event_loop()

-         tasks = [

-             trollius.async(handle_messages()),

-         ]

-         loop.run_until_complete(trollius.wait(tasks))

-         loop.run_forever()

-     except KeyboardInterrupt:

-         pass

-     except trollius.ConnectionResetError:

-         pass

- 

-     log.info("End Connection")

-     loop.close()

-     log.info("End")

- 

- 

- if __name__ == '__main__':

-     log = logging.getLogger("")

-     formatter = logging.Formatter(

-         "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s")

- 

-     # setup console logging

-     log.setLevel(logging.DEBUG)

-     ch = logging.StreamHandler()

-     ch.setLevel(logging.DEBUG)

- 

-     aslog = logging.getLogger("asyncio")

-     aslog.setLevel(logging.DEBUG)

- 

-     ch.setFormatter(formatter)

-     log.addHandler(ch)

-     main()

file modified
+8 -8
@@ -16,7 +16,6 @@ 

  

  import datetime

  import hashlib

- import json

  import logging

  import urlparse

  import re
@@ -27,6 +26,7 @@ 

  

  import flask

  import pagure.lib

+ import pagure.lib.tasks_services

  from pagure.config import config as pagure_config

  

  
@@ -68,13 +68,13 @@ 

          fedmsg_publish(topic, msg)

  

      if redis and project and not project.private:

-         redis.publish(

-             'pagure.hook',

-             json.dumps({

-                 'project': project.fullname,

-                 'topic': topic,

-                 'msg': msg,

-             }))

+         pagure.lib.tasks_services.webhook_notification.delay(

+             topic=topic,

+             msg=msg,

+             namespace=project.namespace,

+             name=project.name,

+             user=project.user.username if project.is_fork else None,

+         )

  

  

  def _add_mentioned_users(emails, comment):

@@ -0,0 +1,141 @@ 

+ # -*- coding: utf-8 -*-

+ 

+ """

+  (c) 2018 - Copyright Red Hat Inc

+ 

+  Authors:

+    Pierre-Yves Chibon <pingou@pingoured.fr>

+ 

+ """

+ 

+ import datetime

+ import hashlib

+ import hmac

+ import json

+ import logging

+ import os

+ import os.path

+ import time

+ import uuid

+ 

+ 

+ import pygit2

+ import requests

+ import six

+ 

+ from celery import Celery

+ from kitchen.text.converters import to_bytes

+ 

+ import pagure.lib

+ from pagure.config import config as pagure_config

+ from pagure.lib.tasks import set_status

+ 

+ # logging.config.dictConfig(pagure_config.get('LOGGING') or {'version': 1})

+ _log = logging.getLogger(__name__)

+ _i = 0

+ 

+ 

+ if os.environ.get('PAGURE_BROKER_URL'):

+     broker_url = os.environ['PAGURE_BROKER_URL']

+ elif pagure_config.get('BROKER_URL'):

+     broker_url = pagure_config['BROKER_URL']

+ else:

+     broker_url = 'redis://%s' % pagure_config['REDIS_HOST']

+ 

+ conn = Celery('tasks', broker=broker_url, backend=broker_url)

+ conn.conf.update(pagure_config['CELERY_CONFIG'])

+ 

+ 

+ def call_web_hooks(project, topic, msg, urls):

+     ''' Sends the web-hook notification. '''

+     _log.info(

+         "Processing project: %s - topic: %s", project.fullname, topic)

+     _log.debug('msg: %s', msg)

+ 

+     # Send web-hooks notification

+     global _i

+     _i += 1

+     year = datetime.datetime.utcnow().year

+     if isinstance(topic, six.text_type):

+         topic = to_bytes(topic, encoding='utf8', nonstring="passthru")

+     msg['pagure_instance'] = pagure_config['APP_URL']

+     msg['project_fullname'] = project.fullname

+     msg = dict(

+         topic=topic.decode('utf-8'),

+         msg=msg,

+         timestamp=int(time.time()),

+         msg_id=str(year) + '-' + str(uuid.uuid4()),

+         i=_i,

+     )

+ 

+     content = json.dumps(msg)

+     hashhex = hmac.new(

+         str(project.hook_token), content, hashlib.sha1).hexdigest()

+     hashhex256 = hmac.new(

+         str(project.hook_token), content, hashlib.sha256).hexdigest()

+     headers = {

+         'X-Pagure': pagure_config['APP_URL'],

+         'X-Pagure-project': project.fullname,

+         'X-Pagure-Signature': hashhex,

+         'X-Pagure-Signature-256': hashhex256,

+         'X-Pagure-Topic': topic,

+         'Content-Type': 'application/json',

+     }

+     for url in urls:

+         url = url.strip()

+         _log.info('Calling url %s' % url)

+         try:

+             req = requests.post(

+                 url,

+                 headers=headers,

+                 data={'payload': content},

+                 timeout=60,

+             )

+             if not req:

+                 _log.info(

+                     'An error occured while querying: %s - '

+                     'Error code: %s' % (url, req.status_code))

+         except (requests.exceptions.RequestException, Exception) as err:

+             _log.info(

+                 'An error occured while querying: %s - Error: %s' % (

+                     url, err))

+ 

+ 

+ @conn.task(queue=pagure_config.get('WEBHOOK_CELERY_QUEUE', None), bind=True)

+ @set_status

+ def webhook_notification(

+         self, topic, msg, namespace=None, name=None, user=None):

+     """ Send webhook notifications about an event on that project.

+ 

+     :arg topic: the topic for the notification

+     :type topic: str

+     :arg msg: the message to send via web-hook

+     :type msg: str

+     :kwarg namespace: the namespace of the project

+     :type namespace: None or str

+     :kwarg name: the name of the project

+     :type name: None or str

+     :kwarg user: the user of the project, only set if the project is a fork

+     :type user: None or str

+ 

+     """

+     session = pagure.lib.create_session(pagure_config['DB_URL'])

+     project = pagure.lib._get_project(

+         session, namespace=namespace, name=name, user=user,

+         case=pagure_config.get('CASE_SENSITIVE', False))

+ 

+     if not project:

+         session.close()

+         raise RuntimeError(

+             'Project: %s/%s from user: %s not found in the DB' % (

+                 namespace, name, user))

+ 

+     urls = project.settings.get('Web-hooks')

+     if not urls:

+         _log.info('No URLs set: %s' % urls)

+         return

+ 

+     urls = urls.split('\n')

+     _log.info('Got the project and urls, going to the webhooks')

+     call_web_hooks(project, topic, msg, urls)

+     session.close()

Signed-off-by: Pierre-Yves Chibon pingou@pingoured.fr

are you going to put all the services in task_services.py?

It can but doesn't have to be

I'm not quite sure how to deal w/ the service and the package structure tbh.

Do we put all the "services" into one systemd service? Do we split them?
Same question for the rpm

rebased onto 72c82010783e8a34266100aba1e9b74014eabc28

6 years ago

I'm not quite sure how to deal w/ the service and the package structure tbh.
Do we put all the "services" into one systemd service? Do we split them?
Same question for the rpm

Maybe a good compromise would be to have different systemd services but only 1 rpm. Keeping different services would make it easier to restart a particular service if needed.

After I am not quite sure how you would do that with celery.

After I am not quite sure how you would do that with celery.

Using multiple queues, which iirc is what I started to do here :)

I'm thinking to start migrating more services to celery and solve the packaging structure in a later PR.

Thoughts?

rebased onto 7937d22

6 years ago

Thanks, let's merge then :)

Pull-Request has been merged by pingou

6 years ago