From aa54e5b94e37dd1beccc78927dc6c06c856041b8 Mon Sep 17 00:00:00 2001 From: Dan Callaghan Date: Feb 21 2018 05:02:55 +0000 Subject: generic message format which matches the HTTP API v2 Fixes #92. This changes the message structure to contain the complete result data, in the same format as the HTTP API v2. Existing message structure and de-duplication logic is preserved in the new publish_taskotron_message() function, which is disabled by default but can be enabled for backwards compatibility by setting MESSAGE_BUS_PUBLISH_TASKOTRON=True. This function is hardcoded to only send on fedmsg and with 'taskotron' as the topic. --- diff --git a/resultsdb/config.py b/resultsdb/config.py index 564cf4f..e4ba6b8 100644 --- a/resultsdb/config.py +++ b/resultsdb/config.py @@ -69,7 +69,7 @@ class Config(object): # the fedmsg plugin expects an extra `modname` argument that can be used to # configure the topic, like this: # ... - # e.g. org.fedoraproject.prod.taskotron.result.new + # e.g. org.fedoraproject.prod.resultsdb.result.new MESSAGE_BUS_KWARGS = {} ## Alternatively, you could use the 'stomp' messaging plugin. @@ -88,6 +88,9 @@ class Config(object): # }, #} + # Publish Taskotron-compatible fedmsgs on the 'taskotron' topic + MESSAGE_BUS_PUBLISH_TASKOTRON = False + class ProductionConfig(Config): DEBUG = False diff --git a/resultsdb/controllers/api_v1.py b/resultsdb/controllers/api_v1.py index 1e1ac20..1eda3f1 100644 --- a/resultsdb/controllers/api_v1.py +++ b/resultsdb/controllers/api_v1.py @@ -38,7 +38,7 @@ from resultsdb import app, db from resultsdb.serializers.api_v1 import Serializer from resultsdb.models.results import Group, Result, Testcase, ResultData from resultsdb.models.results import JOB_STATUS, RESULT_OUTCOME -from resultsdb.messaging import load_messaging_plugin, create_message +from resultsdb.messaging import load_messaging_plugin, create_message, publish_taskotron_message QUERY_LIMIT = 20 @@ -565,40 +565,16 @@ def create_result(): db.session.add(result) if app.config['MESSAGE_BUS_PUBLISH']: - prev_result = get_prev_result(result) - # result is considered duplicate of prev_result when - # outcomes are the same. - if not prev_result or prev_result.outcome != result.outcome: - plugin = load_messaging_plugin( - name=app.config['MESSAGE_BUS_PLUGIN'], - kwargs=app.config['MESSAGE_BUS_KWARGS'], - ) - plugin.publish(create_message(result, prev_result, include_job_url=True)) + plugin = load_messaging_plugin( + name=app.config['MESSAGE_BUS_PLUGIN'], + kwargs=app.config['MESSAGE_BUS_KWARGS'], + ) + plugin.publish(create_message(result)) - return jsonify(SERIALIZE(result)), 201 - - -def get_prev_result(result): - ''' - Find previous result with the same: - item, testcase, outcome and arch. - - Return None if no result is found. - ''' - q = db.session.query(Result).filter(Result.id != result.id) - - alias = db.aliased(Testcase) - q = q.join(alias).filter(alias.name == result.testcase.name) + if app.config['MESSAGE_BUS_PUBLISH_TASKOTRON']: + publish_taskotron_message(result) - for result_data in result.data: - if result_data.key in ['item', 'arch']: - alias = db.aliased(ResultData) - q = q.join(alias).filter( - db.and_(alias.key == result_data.key, alias.value == result_data.value)) - - q = q.order_by(db.desc(Result.submit_time)) - - return q.first() + return jsonify(SERIALIZE(result)), 201 # ============================================================================= diff --git a/resultsdb/controllers/api_v2.py b/resultsdb/controllers/api_v2.py index d442ddd..983f5ed 100644 --- a/resultsdb/controllers/api_v2.py +++ b/resultsdb/controllers/api_v2.py @@ -35,7 +35,7 @@ from resultsdb import app, db from resultsdb.serializers.api_v2 import Serializer from resultsdb.models.results import Group, Result, Testcase, ResultData from resultsdb.models.results import RESULT_OUTCOME -from resultsdb.messaging import load_messaging_plugin, create_message +from resultsdb.messaging import load_messaging_plugin, create_message, publish_taskotron_message from resultsdb.lib.helpers import non_empty, dict_or_string, list_or_none QUERY_LIMIT = 20 @@ -669,40 +669,19 @@ def create_result(): if app.config['MESSAGE_BUS_PUBLISH']: app.logger.debug("Preparing to publish message for result id %d", result.id) - prev_result = get_prev_result(result) - # result is considered duplicate of prev_result when - # outcomes are the same. - if not prev_result or prev_result.outcome != result.outcome: - plugin = load_messaging_plugin( - name=app.config['MESSAGE_BUS_PLUGIN'], - kwargs=app.config['MESSAGE_BUS_KWARGS'], - ) - plugin.publish(create_message(result, prev_result)) - else: - app.logger.debug("Skipping messaging, result %d outcome has not changed", result.id) + plugin = load_messaging_plugin( + name=app.config['MESSAGE_BUS_PLUGIN'], + kwargs=app.config['MESSAGE_BUS_KWARGS'], + ) + plugin.publish(create_message(result)) + if app.config['MESSAGE_BUS_PUBLISH_TASKOTRON']: + app.logger.debug("Preparing to publish Taskotron message for result id %d", result.id) + publish_taskotron_message(result) return jsonify(SERIALIZE(result)), 201 -def get_prev_result(result): - """ - Find previous result with the same testcase, item, type, and arch. - Return None if no result is found. - """ - q = db.session.query(Result).filter(Result.id != result.id) - q = q.filter_by(testcase_name=result.testcase_name) - - for result_data in result.data: - if result_data.key in ['item', 'type', 'arch']: - alias = db.aliased(ResultData) - q = q.join(alias).filter( - db.and_(alias.key == result_data.key, alias.value == result_data.value)) - - q = q.order_by(db.desc(Result.submit_time)) - return q.first() - - # ============================================================================= # TESTCASES # ============================================================================= diff --git a/resultsdb/messaging.py b/resultsdb/messaging.py index c4c63b4..c08e27e 100644 --- a/resultsdb/messaging.py +++ b/resultsdb/messaging.py @@ -24,11 +24,59 @@ import pkg_resources import fedmsg +from resultsdb import db +from resultsdb.models.results import Result, ResultData +from resultsdb.serializers.api_v2 import Serializer + import logging log = logging.getLogger(__name__) -def create_message(result, prev_result=None, include_job_url=False): +SERIALIZE = Serializer().serialize + + +def get_prev_result(result): + """ + Find previous result with the same testcase, item, type, and arch. + Return None if no result is found. + + Note that this logic is Taskotron-specific: it does not consider the + possibility that a result may be distinguished by other keys in the data + (for example 'scenario' which is used in OpenQA results). But this is only + used for publishing Taskotron compatibility messages, thus we keep this + logic as is. + """ + q = db.session.query(Result).filter(Result.id != result.id) + q = q.filter_by(testcase_name=result.testcase_name) + + for result_data in result.data: + if result_data.key in ['item', 'type', 'arch']: + alias = db.aliased(ResultData) + q = q.join(alias).filter( + db.and_(alias.key == result_data.key, alias.value == result_data.value)) + + q = q.order_by(db.desc(Result.submit_time)) + return q.first() + + +def publish_taskotron_message(result, include_job_url=False): + """ + Publish a fedmsg on the taskotron topic with Taskotron-compatible structure. + + These messages are deprecated, consumers should consume from the resultsdb + topic instead. + """ + prev_result = get_prev_result(result) + if prev_result is not None and prev_result.outcome == result.outcome: + # If the previous result had the same outcome, skip publishing + # a message for this new result. + # This was intended as a workaround to avoid spammy messages from the + # dist.depcheck task, which tends to produce a very large number of + # identical results for any given build, because of the way that it is + # designed. + log.debug("Skipping Taskotron message for result %d, outcome has not changed", result.id) + return + task = dict( (datum.key, datum.value) for datum in result.data @@ -49,7 +97,12 @@ def create_message(result, prev_result=None, include_job_url=False): if include_job_url: # only in the v1 API msg['result']['job_url'] = result.groups[0].ref_url if result.groups else None - return msg + fedmsg.publish(topic='result.new', modname='taskotron', msg=msg) + + +def create_message(result): + # Re-use the same structure as in the HTTP API v2. + return SERIALIZE(result) class MessagingPlugin(object): diff --git a/testing/functest_api_v10.py b/testing/functest_api_v10.py index ca544f3..9ab59d1 100644 --- a/testing/functest_api_v10.py +++ b/testing/functest_api_v10.py @@ -51,6 +51,7 @@ class TestFuncApiV10(object): r = self.app.post('/api/v1.0/jobs', data=json.dumps(job_data), content_type='application/json') assert r.status_code == 201 job_id = json.loads(r.data)['id'] + job_uuid = json.loads(r.data)['uuid'] result_data = { 'job_id': job_id, @@ -74,9 +75,11 @@ class TestFuncApiV10(object): # Check that a message was emitted. plugin = resultsdb.messaging.DummyPlugin assert len(plugin.history) == 1, plugin.history - assert plugin.history[0]['task']['item'] == 'openfst-1.6.6-1.fc28' - assert plugin.history[0]['task']['type'] == 'koji_build' - assert plugin.history[0]['result']['id'] == 1 - assert plugin.history[0]['result']['outcome'] == 'FAILED' - assert plugin.history[0]['result']['log_url'] == 'https://taskotron.example.com/artifacts/' - assert plugin.history[0]['result']['job_url'] == 'https://taskotron.example.com/execdb/' + assert plugin.history[0]['data']['item'] == ['openfst-1.6.6-1.fc28'] + assert plugin.history[0]['data']['type'] == ['koji_build'] + assert plugin.history[0]['id'] == 1 + assert plugin.history[0]['outcome'] == 'FAILED' + assert plugin.history[0]['groups'] == [job_uuid] + assert plugin.history[0]['note'] == '78 errors, 150 warnings' + assert plugin.history[0]['ref_url'] == 'https://taskotron.example.com/artifacts/' + assert plugin.history[0]['testcase']['name'] == 'dist.rpmlint' diff --git a/testing/functest_api_v20.py b/testing/functest_api_v20.py index 7f639cf..36080b9 100644 --- a/testing/functest_api_v20.py +++ b/testing/functest_api_v20.py @@ -845,8 +845,11 @@ class TestFuncApiV20(): self.helper_create_result() plugin = resultsdb.messaging.DummyPlugin assert len(plugin.history) == 1, plugin.history - assert plugin.history[0]['task']['item'] == self.ref_result_item - assert plugin.history[0]['task']['type'] == self.ref_result_type - assert plugin.history[0]['result']['id'] == 1 - assert plugin.history[0]['result']['outcome'] == self.ref_result_outcome - assert plugin.history[0]['result']['log_url'] == self.ref_result_ref_url + assert plugin.history[0]['data']['item'] == [self.ref_result_item] + assert plugin.history[0]['data']['type'] == [self.ref_result_type] + assert plugin.history[0]['id'] == 1 + assert plugin.history[0]['outcome'] == self.ref_result_outcome + assert plugin.history[0]['ref_url'] == self.ref_result_ref_url + assert plugin.history[0]['groups'] == [self.ref_group_uuid] + assert plugin.history[0]['note'] == self.ref_result_note + assert plugin.history[0]['testcase']['name'] == self.ref_testcase_name diff --git a/testing/functest_create_fedmsg.py b/testing/functest_create_fedmsg.py index e0b5ed3..bd1ad5e 100644 --- a/testing/functest_create_fedmsg.py +++ b/testing/functest_create_fedmsg.py @@ -25,7 +25,6 @@ import copy import resultsdb import resultsdb.cli -import resultsdb.controllers.api_v2 as apiv2 import resultsdb.messaging @@ -127,12 +126,12 @@ class TestFuncCreateFedmsg(): return r, data def test_get_prev_result_no_results(self): - prev_result = apiv2.get_prev_result(self.ref_result_obj) + prev_result = resultsdb.messaging.get_prev_result(self.ref_result_obj) assert prev_result is None def test_get_prev_result_exists(self): self.helper_create_result() - prev_result = apiv2.get_prev_result(self.ref_result_obj) + prev_result = resultsdb.messaging.get_prev_result(self.ref_result_obj) assert prev_result.id == 1 assert prev_result.outcome == self.ref_result_outcome @@ -146,7 +145,7 @@ class TestFuncCreateFedmsg(): assert result_data.value == self.ref_result_arch self.helper_create_result() - prev_result = apiv2.get_prev_result(self.ref_result_obj) + prev_result = resultsdb.messaging.get_prev_result(self.ref_result_obj) assert prev_result.id == 2 assert prev_result.outcome == self.ref_result_outcome @@ -163,7 +162,7 @@ class TestFuncCreateFedmsg(): if self.ref_result_outcome == ref_outcome: ref_outcome = 'PASSED' self.helper_create_result(outcome=ref_outcome) - prev_result = apiv2.get_prev_result(self.ref_result_obj) + prev_result = resultsdb.messaging.get_prev_result(self.ref_result_obj) assert prev_result.id == 3 assert prev_result.outcome == ref_outcome @@ -181,7 +180,7 @@ class TestFuncCreateFedmsg(): data['item'] = data['item'] + '.fake' self.helper_create_result(data=data) - prev_result = apiv2.get_prev_result(self.ref_result_obj) + prev_result = resultsdb.messaging.get_prev_result(self.ref_result_obj) assert prev_result is None def test_get_prev_result_different_type(self): @@ -189,7 +188,7 @@ class TestFuncCreateFedmsg(): data['type'] = data['type'] + '.fake' self.helper_create_result(data=data) - prev_result = apiv2.get_prev_result(self.ref_result_obj) + prev_result = resultsdb.messaging.get_prev_result(self.ref_result_obj) assert prev_result is None def test_get_prev_result_different_arch(self): @@ -197,11 +196,11 @@ class TestFuncCreateFedmsg(): data['arch'] = data['arch'] + '.fake' self.helper_create_result(data=data) - prev_result = apiv2.get_prev_result(self.ref_result_obj) + prev_result = resultsdb.messaging.get_prev_result(self.ref_result_obj) assert prev_result is None def test_get_prev_result_different_testcase_name(self): self.helper_create_result(testcase={'name': self.ref_testcase_name + '.fake'}) - prev_result = apiv2.get_prev_result(self.ref_result_obj) + prev_result = resultsdb.messaging.get_prev_result(self.ref_result_obj) assert prev_result is None