From abb591ef77c1b83076952825d50f2ae5ae140e06 Mon Sep 17 00:00:00 2001 From: Pierre-Yves Chibon Date: Mar 08 2018 09:52:10 +0000 Subject: Merge pagure-ci into the pagure's celery-based services By porting pagure-ci to be celery-based we drop the requirement on trollius giving us more flexibility to move to python3 and queuing systems such as rabbitmq (instead of redis atm). Signed-off-by: Pierre-Yves Chibon --- diff --git a/pagure-ci/pagure_ci_server.py b/pagure-ci/pagure_ci_server.py deleted file mode 100644 index 2a55d30..0000000 --- a/pagure-ci/pagure_ci_server.py +++ /dev/null @@ -1,165 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" - (c) 2016 - Copyright Red Hat Inc - - Authors: - Pierre-Yves Chibon - - -This server listens to message sent via redis and send the corresponding -web-hook request. - -Using this mechanism, we no longer block the main application if the -receiving end is offline or so. - -""" - -from __future__ import print_function -import json -import logging -import os - -import jenkins -import trollius -import trollius_redis - -import pagure -import pagure.lib - - -if 'PAGURE_CONFIG' not in os.environ \ - and os.path.exists('/etc/pagure/pagure.cfg'): - print('Using configuration file `/etc/pagure/pagure.cfg`') - os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg' - - -_log = logging.getLogger(__name__) -_config = pagure.config.config.reload_config() - - -@trollius.coroutine -def handle_messages(): - ''' Handles connecting to redis and acting upon messages received. - In this case, it means triggering a build on jenkins based on the - information provided. - ''' - - host = _config.get('REDIS_HOST', '0.0.0.0') - port = _config.get('REDIS_PORT', 6379) - dbname = _config.get('REDIS_DB', 0) - connection = yield trollius.From(trollius_redis.Connection.create( - host=host, port=port, db=dbname)) - - # Create subscriber. - subscriber = yield trollius.From(connection.start_subscribe()) - - # Subscribe to channel. - yield trollius.From(subscriber.subscribe(['pagure.ci'])) - - # Inside a while loop, wait for incoming events. - while True: - reply = yield trollius.From(subscriber.next_published()) - _log.info( - 'Received: %s on channel: %s', - repr(reply.value), reply.channel) - data = json.loads(reply.value) - - pr_id = data['pr']['id'] - pr_uid = data['pr']['uid'] - branch = data['pr']['branch_from'] - _log.info('Looking for PR: %s', pr_uid) - session = pagure.lib.create_session(_config['DB_URL']) - request = pagure.lib.get_request_by_uid(session, pr_uid) - - _log.info('PR retrieved: %s', request) - - if not request: - _log.warning( - 'No request could be found from the message %s', data) - session.close() - continue - - _log.info( - "Trigger on %s PR #%s from %s: %s", - request.project.fullname, pr_id, - request.project_from.fullname, branch) - - url = request.project.ci_hook.ci_url.rstrip('/') - - if data['ci_type'] == 'jenkins': - _log.info('Jenkins CI') - repo = '%s/%s' % ( - _config['GIT_URL_GIT'].rstrip('/'), - request.project_from.path) - - # Jenkins Base URL - base_url, name = url.split('/job/', 1) - jenkins_name = name.split('/', 1)[0] - - data = { - 'cause': pr_id, - 'REPO': repo, - 'BRANCH': branch - } - - server = jenkins.Jenkins(base_url) - _log.info('Triggering at: %s for: %s - data: %s' % ( - base_url, jenkins_name, data)) - try: - server.build_job( - name=jenkins_name, - parameters=data, - token=request.project.ci_hook.pagure_ci_token - ) - _log.info('Build triggered') - except Exception as err: - _log.info('An error occured: %s', err) - - else: - _log.warning('Un-supported CI type') - - session.close() - _log.info('Ready for another') - - -def main(): - ''' Start the main async loop. ''' - - try: - loop = trollius.get_event_loop() - tasks = [ - trollius.async(handle_messages()), - ] - loop.run_until_complete(trollius.wait(tasks)) - loop.run_forever() - except KeyboardInterrupt: - pass - except trollius.ConnectionResetError: - pass - - _log.info("End Connection") - loop.close() - _log.info("End") - - -if __name__ == '__main__': - formatter = logging.Formatter( - "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s") - - logging.basicConfig(level=logging.DEBUG) - - # setup console logging - _log.setLevel(logging.DEBUG) - shellhandler = logging.StreamHandler() - shellhandler.setLevel(logging.DEBUG) - - aslog = logging.getLogger("asyncio") - aslog.setLevel(logging.DEBUG) - aslog = logging.getLogger("trollius") - aslog.setLevel(logging.DEBUG) - - shellhandler.setFormatter(formatter) - _log.addHandler(shellhandler) - main() diff --git a/pagure/lib/__init__.py b/pagure/lib/__init__.py index 502c046..81dc479 100644 --- a/pagure/lib/__init__.py +++ b/pagure/lib/__init__.py @@ -1253,13 +1253,17 @@ def add_pull_request_comment(session, request, commit, tree_id, filename, # Send notification to the CI server, if the comment added was a # notification and the PR is still open and project is not private - if notification and request.status == 'Open' \ - and PAGURE_CI and request.project.ci_hook\ + if notification \ + and request.status == 'Open' \ + and pagure_config.get('PAGURE_CI_SERVICES') \ + and request.project.ci_hook \ and not request.project.private: - REDIS.publish('pagure.ci', json.dumps({ - 'ci_type': request.project.ci_hook.ci_type, - 'pr': request.to_json(public=True, with_comments=False) - })) + pagure.lib.tasks_services.trigger_ci_build.delay( + pr_uid=request.uid, + pr_id=request.id, + branch=request.branch_from, + ci_type=request.project.ci_hook.ci_type + ) pagure.lib.notify.log( request.project, @@ -1271,13 +1275,16 @@ def add_pull_request_comment(session, request, commit, tree_id, filename, redis=REDIS, ) - if trigger_ci and comment.strip().lower() in trigger_ci: - # Send notification to the CI server - if REDIS and PAGURE_CI and request.project.ci_hook: - REDIS.publish('pagure.ci', json.dumps({ - 'ci_type': request.project.ci_hook.ci_type, - 'pr': request.to_json(public=True, with_comments=False) - })) + if trigger_ci \ + and comment.strip().lower() in trigger_ci \ + and pagure_config.get('PAGURE_CI_SERVICES') \ + and request.project.ci_hook: + pagure.lib.tasks_services.trigger_ci_build.delay( + pr_uid=request.uid, + pr_id=request.id, + branch=request.branch_from, + ci_type=request.project.ci_hook.ci_type + ) return 'Comment added' @@ -1748,12 +1755,15 @@ def new_pull_request(session, branch_from, ) # Send notification to the CI server - if REDIS and PAGURE_CI and request.project.ci_hook \ + if pagure_config.get('PAGURE_CI_SERVICES') \ + and request.project.ci_hook \ and not request.project.private: - REDIS.publish('pagure.ci', json.dumps({ - 'ci_type': request.project.ci_hook.ci_type, - 'pr': request.to_json(public=True, with_comments=False) - })) + pagure.lib.tasks_services.trigger_ci_build.delay( + pr_uid=request.uid, + pr_id=request.id, + branch=request.branch_from, + ci_type=request.project.ci_hook.ci_type + ) # Create the ref from the start tasks.sync_pull_ref.delay( diff --git a/pagure/lib/tasks_services.py b/pagure/lib/tasks_services.py index 267b82a..6e73c63 100644 --- a/pagure/lib/tasks_services.py +++ b/pagure/lib/tasks_services.py @@ -334,3 +334,74 @@ def load_json_commits_to_db( finally: session.close() _log.info('LOADJSON: Ready for another') + + +@conn.task(queue=pagure_config.get('CI_CELERY_QUEUE', None), bind=True) +@set_status +def trigger_ci_build(self, pr_uid, pr_id, branch, ci_type): + ''' Triggers a new run of the CI system on the specified pull-request. + + ''' + session = pagure.lib.create_session(pagure_config['DB_URL']) + + pagure.lib.plugins.get_plugin('Pagure CI') + + _log.info('Pagure-CI: Looking for PR: %s', pr_uid) + request = pagure.lib.get_request_by_uid(session, pr_uid) + + _log.info('Pagure-CI: PR retrieved: %s', request) + + if not request: + _log.warning( + 'Pagure-CI: No request could be found for the uid %s', pr_uid) + session.close() + return + + _log.info( + "Pagure-CI: Trigger on %s PR #%s from %s: %s", + request.project.fullname, pr_id, + request.project_from.fullname, branch) + + url = request.project.ci_hook.ci_url.rstrip('/') + + if ci_type == 'jenkins': + try: + import jenkins + except ImportError: + _log.error( + 'Pagure-CI: Failed to load the jenkins module, bailing') + return + + _log.info('Jenkins CI') + repo = '%s/%s' % ( + pagure_config['GIT_URL_GIT'].rstrip('/'), + request.project_from.path) + + # Jenkins Base URL + base_url, name = url.split('/job/', 1) + jenkins_name = name.rstrip('/').replace('/job/', '/') + + data = { + 'cause': pr_id, + 'REPO': repo, + 'BRANCH': branch + } + + server = jenkins.Jenkins(base_url) + _log.info('Pagure-CI: Triggering at: %s for: %s - data: %s' % ( + base_url, jenkins_name, data)) + try: + server.build_job( + name=jenkins_name, + parameters=data, + token=request.project.ci_hook.pagure_ci_token + ) + _log.info('Pagure-CI: Build triggered') + except Exception as err: + _log.info('Pagure-CI:An error occured: %s', err) + + else: + _log.warning('Pagure-CI:Un-supported CI type') + + session.close() + _log.info('Pagure-CI: Ready for another') diff --git a/tests/test_pagure_lib.py b/tests/test_pagure_lib.py index 29d62d1..13b3912 100644 --- a/tests/test_pagure_lib.py +++ b/tests/test_pagure_lib.py @@ -2568,7 +2568,7 @@ class PagureLibtests(tests.Modeltests): mock_redis.return_value = True self.test_new_pull_request() - self.assertEqual(mock_redis.publish.call_count, 1) + self.assertEqual(mock_redis.publish.call_count, 0) # Let's pretend we turned on the CI hook for the project project = pagure.lib._get_project(self.session, 'test') @@ -2600,7 +2600,7 @@ class PagureLibtests(tests.Modeltests): self.assertEqual(len(request.discussion), 0) self.assertEqual(len(request.comments), 1) self.assertEqual(request.score, 0) - self.assertEqual(mock_redis.publish.call_count, 4) + self.assertEqual(mock_redis.publish.call_count, 1) @patch('pagure.lib.notify.send_email') def test_add_pull_request_flag(self, mockemail):