From 00115db9434e0f12df949a94a671ae2f41dcbb4b Mon Sep 17 00:00:00 2001 From: Aurélien Bompard Date: Mar 23 2018 17:14:15 +0000 Subject: Tests: run the task synchronously --- diff --git a/tests/__init__.py b/tests/__init__.py index d6da102..c7d23db 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -36,7 +36,9 @@ from urlparse import urlparse, parse_qs import mock import pygit2 +import redis +from celery.app.task import EagerResult from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import scoped_session @@ -78,7 +80,14 @@ TICKETS_FOLDER = %(tickets_folder)r DOCS_FOLDER = %(docs_folder)r ATTACHMENTS_FOLDER = '%(path)s/attachments' BROKER_URL = 'redis+socket://%(global_path)s/broker' +CELERY_CONFIG = { + "task_always_eager": True, +} """ +# The Celery docs warn against using task_always_eager: +# http://docs.celeryproject.org/en/latest/userguide/testing.html +# but that warning is only valid when testing the async nature of the task, not +# what the task actually does. LOG.info('BUILD_ID: %s', os.environ.get('BUILD_ID')) @@ -140,6 +149,8 @@ def user_set(APP, user): tests_state = { "path": tempfile.mkdtemp(prefix='pagure-tests-'), "broker": None, + "broker_client": None, + "results": {}, } @@ -176,6 +187,11 @@ def _populate_db(session): session.commit() +def store_eager_results(*args, **kwargs): + """A wrapper for EagerResult that stores the instance.""" + result = EagerResult(*args, **kwargs) + tests_state["results"][result.id] = result + return result def setUp(): @@ -198,10 +214,17 @@ def setUp(): broker.poll() if broker.returncode is not None: raise Exception('Broker failed to start') + tests_state["broker_client"] = redis.Redis(unix_socket_path=broker_url) + + # Store the EagerResults to be able to retrieve them later + tests_state["eg_patcher"] = mock.patch('celery.app.task.EagerResult') + eg_mock = tests_state["eg_patcher"].start() + eg_mock.side_effect = store_eager_results def tearDown(): tests_state["db_session"].close() + tests_state["eg_patcher"].stop() broker = tests_state["broker"] broker.kill() broker.wait() @@ -290,8 +313,12 @@ class SimplePagureTest(unittest.TestCase): self._app.logger.handlers = [] self.app = self._app.test_client() + self.gr_patcher = mock.patch('pagure.lib.tasks.get_result') + gr_mock = self.gr_patcher.start() + gr_mock.side_effect = lambda tid: tests_state["results"][tid] def tearDown(self): + self.gr_patcher.stop() self.session.rollback() self._clear_database() @@ -309,8 +336,8 @@ class SimplePagureTest(unittest.TestCase): del self._app def shortDescription(self): - doc = self.__str__() +": "+self._testMethodDoc - return doc or None + doc = self.__str__() + ": " + self._testMethodDoc + return doc or None def _prepare_db(self): self.dbpath = 'sqlite:///%s' % os.path.join( @@ -363,72 +390,12 @@ class Modeltests(SimplePagureTest): """ Set up the environnment, ran before every tests. """ # Clean up test performance info super(Modeltests, self).setUp() - - # Start a worker - # Using cocurrency 2 to test with some concurrency, but not be heavy - # Using eventlet so that worker.terminate kills everything - self.workerlog = open(os.path.join('.', 'worker.log'), 'w') - celery_exec = 'celery' - celery_env = os.environ.copy() - celery_env.update({ - 'PAGURE_BROKER_URL': pagure_config['BROKER_URL'], - 'PAGURE_CONFIG': os.path.join(self.path, 'config'), - 'PYTHONPATH': '.' - }) - celery_cwd = os.path.normpath( - os.path.join(os.path.dirname(__file__), '..') - ) - self.worker = subprocess.Popen( - [celery_exec, '-A', 'pagure.lib.tasks', 'worker', - '--loglevel=info', '--concurrency=2', '--pool=eventlet', - '--without-gossip', '--without-mingle', '--quiet'], - env=celery_env, - cwd=celery_cwd, - stdout=self.workerlog, - stderr=self.workerlog) - self.worker.poll() - if self.worker.returncode is not None: - raise Exception('Worker failed to start') - - # We could do the ping below in-process: - # pagure.lib.tasks.conn.control.ping(timeout=0.1) - # but if we try it, Python starts raising OSError - # with too many open files. This is probably related - # to https://github.com/celery/celery/issues/4465 - wait_start = time.time() - while True: - time.sleep(0.1) - res = subprocess.call( - [celery_exec, '-A', 'pagure.lib.tasks', - 'inspect', '-t=0.1', 'ping'], - env=celery_env, - cwd=celery_cwd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE - ) - if res == 0: - break - if time.time() - wait_start > 5: - raise Exception('Worker failed to initialize in 5 seconds') - self.app.get = create_maybe_waiter(self.app.get, self.app.get) self.app.post = create_maybe_waiter(self.app.post, self.app.get) def tearDown(self): # pylint: disable=invalid-name """ Remove the test.db database if there is one. """ - # Terminate worker - # We just send a SIGKILL (kill -9), since when the test finishes, we - # don't really care about the output of either worker or broker - # anymore - self.worker.kill() - self.worker.wait() - self.worker = None - self.workerlog.close() - self.workerlog = None - subprocess.check_call( - ['/usr/bin/redis-cli', '-s', - pagure_config['BROKER_URL'][len('redis+socket://'):], 'flushall'], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + tests_state["broker_client"].flushall() super(Modeltests, self).tearDown() diff --git a/tests/test_pagure_flask_api_project.py b/tests/test_pagure_flask_api_project.py index 4a1cde3..52ffa84 100644 --- a/tests/test_pagure_flask_api_project.py +++ b/tests/test_pagure_flask_api_project.py @@ -19,6 +19,7 @@ import tempfile import os import pygit2 +from celery.result import EagerResult from mock import patch, Mock sys.path.insert(0, os.path.join(os.path.dirname( @@ -38,8 +39,7 @@ class PagureFlaskApiProjecttests(tests.Modeltests): self.gga_patcher = patch( 'pagure.lib.tasks.generate_gitolite_acls.delay') self.mock_gen_acls = self.gga_patcher.start() - task_result = Mock() - task_result.id = 'abc-1234' + task_result = EagerResult('abc-1234', True, "SUCCESS") self.mock_gen_acls.return_value = task_result def tearDown(self): @@ -2627,6 +2627,7 @@ class PagureFlaskApiProjecttests(tests.Modeltests): self.assertEqual(data, expected_output) self.mock_gen_acls.assert_called_once_with( name='test', namespace=None, user=None, group=None) + self.assertTrue(task_result.get.called) def test_api_generate_acls_no_project(self): """ Test the api_generate_acls method of the flask api when the project