#101 generic message format which matches the HTTP API v2
Merged 6 years ago by jskladan. Opened 6 years ago by dcallagh.
taskotron/ dcallagh/resultsdb issue-92  into  develop

file modified
+4 -1
@@ -69,7 +69,7 @@ 

      # the fedmsg plugin expects an extra `modname` argument that can be used to

      # configure the topic, like this:

      #   <topic_prefix>.<environment>.<modname>.<topic>

-     # e.g. org.fedoraproject.prod.taskotron.result.new

+     # e.g. org.fedoraproject.prod.resultsdb.result.new

      MESSAGE_BUS_KWARGS = {}

  

      ## Alternatively, you could use the 'stomp' messaging plugin.
@@ -88,6 +88,9 @@ 

      #    },

      #}

  

+     # Publish Taskotron-compatible fedmsgs on the 'taskotron' topic

+     MESSAGE_BUS_PUBLISH_TASKOTRON = False

+ 

  

  class ProductionConfig(Config):

      DEBUG = False

file modified
+10 -34
@@ -38,7 +38,7 @@ 

  from resultsdb.serializers.api_v1 import Serializer

  from resultsdb.models.results import Group, Result, Testcase, ResultData

  from resultsdb.models.results import JOB_STATUS, RESULT_OUTCOME

- from resultsdb.messaging import load_messaging_plugin

+ from resultsdb.messaging import load_messaging_plugin, create_message, publish_taskotron_message

  

  QUERY_LIMIT = 20

  
@@ -314,7 +314,7 @@ 

      except HTTPException as error:

          return jsonify(error.data), error.code

  

-     if not args['iuuid']:

+     if not args['uuid']:

          args['uuid'] = str(uuid.uuid1())

  

      job = Group(args['uuid'], args['ref_url'], args['name'])
@@ -565,40 +565,16 @@ 

      db.session.add(result)

  

      if app.config['MESSAGE_BUS_PUBLISH']:

-         prev_result = get_prev_result(result)

-         # result is considered duplicate of prev_result when

-         # outcomes are the same.

-         if not prev_result or prev_result.outcome != result.outcome:

-             plugin = load_messaging_plugin(

-                 name=app.config['MESSAGE_BUS_PLUGIN'],

-                 kwargs=app.config['MESSAGE_BUS_KWARGS'],

-             )

-             plugin.publish(plugin.create_message(result, prev_result))

+         plugin = load_messaging_plugin(

+             name=app.config['MESSAGE_BUS_PLUGIN'],

+             kwargs=app.config['MESSAGE_BUS_KWARGS'],

+         )

+         plugin.publish(create_message(result))

  

-     return jsonify(SERIALIZE(result)), 201

- 

- 

- def get_prev_result(result):

-     '''

-     Find previous result with the same:

-     item, testcase, outcome and arch.

- 

-     Return None if no result is found.

-     '''

-     q = db.session.query(Result).filter(Result.id != result.id)

- 

-     alias = db.aliased(Testcase)

-     q = q.join(alias).filter(alias.name == result.testcase.name)

+     if app.config['MESSAGE_BUS_PUBLISH_TASKOTRON']:

+         publish_taskotron_message(result)

  

-     for result_data in result.data:

-         if result_data.key in ['item', 'arch']:

-             alias = db.aliased(ResultData)

-             q = q.join(alias).filter(

-                 db.and_(alias.key == result_data.key, alias.value == result_data.value))

- 

-     q = q.order_by(db.desc(Result.submit_time))

- 

-     return q.first()

+     return jsonify(SERIALIZE(result)), 201

  

  

  # =============================================================================

@@ -35,7 +35,7 @@ 

  from resultsdb.serializers.api_v2 import Serializer

  from resultsdb.models.results import Group, Result, Testcase, ResultData

  from resultsdb.models.results import RESULT_OUTCOME

- from resultsdb.messaging import load_messaging_plugin

+ from resultsdb.messaging import load_messaging_plugin, create_message, publish_taskotron_message

  from resultsdb.lib.helpers import non_empty, dict_or_string, list_or_none

  

  QUERY_LIMIT = 20
@@ -669,40 +669,19 @@ 

  

      if app.config['MESSAGE_BUS_PUBLISH']:

          app.logger.debug("Preparing to publish message for result id %d", result.id)

-         prev_result = get_prev_result(result)

-         # result is considered duplicate of prev_result when

-         # outcomes are the same.

-         if not prev_result or prev_result.outcome != result.outcome:

-             plugin = load_messaging_plugin(

-                 name=app.config['MESSAGE_BUS_PLUGIN'],

-                 kwargs=app.config['MESSAGE_BUS_KWARGS'],

-             )

-             plugin.publish(plugin.create_message(result, prev_result))

-         else:

-             app.logger.debug("Skipping messaging, result %d outcome has not changed", result.id)

+         plugin = load_messaging_plugin(

+             name=app.config['MESSAGE_BUS_PLUGIN'],

+             kwargs=app.config['MESSAGE_BUS_KWARGS'],

+         )

+         plugin.publish(create_message(result))

  

+     if app.config['MESSAGE_BUS_PUBLISH_TASKOTRON']:

+         app.logger.debug("Preparing to publish Taskotron message for result id %d", result.id)

+         publish_taskotron_message(result)

  

      return jsonify(SERIALIZE(result)), 201

  

  

- def get_prev_result(result):

-     """

-     Find previous result with the same testcase, item, type, arch and

-     scenario. Return None if no result is found.

-     """

-     q = db.session.query(Result).filter(Result.id != result.id)

-     q = q.filter_by(testcase_name=result.testcase_name)

- 

-     for result_data in result.data:

-         if result_data.key in ['item', 'type', 'arch', 'scenario']:

-             alias = db.aliased(ResultData)

-             q = q.join(alias).filter(

-                 db.and_(alias.key == result_data.key, alias.value == result_data.value))

- 

-     q = q.order_by(db.desc(Result.submit_time))

-     return q.first()

- 

- 

  # =============================================================================

  #                                    TESTCASES

  # =============================================================================

file modified
+79 -68
@@ -24,15 +24,91 @@ 

  

  import fedmsg

  

+ from resultsdb import db

+ from resultsdb.models.results import Result, ResultData

+ from resultsdb.serializers.api_v2 import Serializer

+ 

  import logging

  log = logging.getLogger(__name__)

  

  

+ SERIALIZE = Serializer().serialize

+ 

+ 

+ def get_prev_result(result):

+     """

+     Find previous result with the same testcase, item, type, and arch.

+     Return None if no result is found.

+ 

+     Note that this logic is Taskotron-specific: it does not consider the 

+     possibility that a result may be distinguished by other keys in the data 

+     (for example 'scenario' which is used in OpenQA results). But this is only 

+     used for publishing Taskotron compatibility messages, thus we keep this 

+     logic as is.

+     """

+     q = db.session.query(Result).filter(Result.id != result.id)

+     q = q.filter_by(testcase_name=result.testcase_name)

+ 

+     for result_data in result.data:

+         if result_data.key in ['item', 'type', 'arch']:

+             alias = db.aliased(ResultData)

+             q = q.join(alias).filter(

+                 db.and_(alias.key == result_data.key, alias.value == result_data.value))

+ 

+     q = q.order_by(db.desc(Result.submit_time))

+     return q.first()

+ 

+ 

+ def publish_taskotron_message(result, include_job_url=False):

+     """

+     Publish a fedmsg on the taskotron topic with Taskotron-compatible structure.

+ 

+     These messages are deprecated, consumers should consume from the resultsdb 

+     topic instead.

+     """

+     prev_result = get_prev_result(result)

+     if prev_result is not None and prev_result.outcome == result.outcome:

+         # If the previous result had the same outcome, skip publishing 

+         # a message for this new result.

+         # This was intended as a workaround to avoid spammy messages from the 

+         # dist.depcheck task, which tends to produce a very large number of 

+         # identical results for any given build, because of the way that it is 

+         # designed.

+         log.debug("Skipping Taskotron message for result %d, outcome has not changed", result.id)

+         return

+ 

+     task = dict(

+         (datum.key, datum.value)

+         for datum in result.data

+         if datum.key in ('item', 'type',)

+     )

+     task['name'] = result.testcase.name

+     msg = {

+         'task': task,

+         'result': {

+             'id': result.id,

+             'submit_time': result.submit_time.strftime("%Y-%m-%d %H:%M:%S UTC"),

+             'prev_outcome': prev_result.outcome if prev_result else None,

+             'outcome': result.outcome,

+             'log_url': result.ref_url,

+         }

+     }

+ 

+     if include_job_url:  # only in the v1 API

+         msg['result']['job_url'] = result.groups[0].ref_url if result.groups else None

+ 

+     fedmsg.publish(topic='result.new', modname='taskotron', msg=msg)

+ 

+ 

+ def create_message(result):

+     # Re-use the same structure as in the HTTP API v2.

+     return SERIALIZE(result)

+ 

+ 

  class MessagingPlugin(object):

      """ Abstract base class that messaging plugins must extend.

  

-     Two abstract methods are declared which must be implemented:

-         - create_message(result, prev_result=None)

+     One abstract method is declared which must be implemented:

          - publish(message)

  

      """
@@ -43,10 +119,6 @@ 

              setattr(self, key, value)

  

      @abc.abstractmethod

-     def create_message(self, result, prev_result=None):

-         pass

- 

-     @abc.abstractmethod

      def publish(self, message):

          pass

  
@@ -61,46 +133,12 @@ 

          self.history.append(message)

          log.info("%r->%r" % (self, message))

  

-     def create_message(self, result, prev_result):

-         return dict(id=result.id)

- 

  

  class FedmsgPlugin(MessagingPlugin):

      """ A fedmsg plugin, used to publish to the fedmsg bus. """

  

      def publish(self, message):

-         fedmsg.publish(**message)

- 

-     def create_message(self, result, prev_result):

-         task = dict(

-             (datum.key, datum.value)

-             for datum in result.data

-         )

-         task['name'] = result.testcase.name

-         msg = {

-             'topic': 'result.new',

-             'modname': self.modname,

-             'msg': {

-                 'task': task,

-                 'result': {

-                     'id': result.id,

-                     'submit_time': result.submit_time.strftime("%Y-%m-%d %H:%M:%S UTC"),

-                     'prev_outcome': prev_result.outcome if prev_result else None,

-                     'outcome': result.outcome,

-                     'log_url': result.ref_url,

-                 }

-             }

-         }

- 

-         # For the v1 API

-         if hasattr(result, 'job'):

-             msg['msg']['result']['job_url'] = result.job.ref_url

- 

-         # For the v2 API

-         if hasattr(result, 'group'):

-             msg['msg']['result']['group_url'] = result.group.ref_url

- 

-         return msg

+         fedmsg.publish(topic='result.new', modname=self.modname, msg=message)

  

  

  class StompPlugin(MessagingPlugin):
@@ -133,33 +171,6 @@ 

          finally:

              conn.disconnect()

  

-     def create_message(self, result, prev_result):

-         task = dict(

-             (datum.key, datum.value)

-             for datum in result.data

-         )

-         task['name'] = result.testcase.name

-         msg = {

-             'task': task,

-             'result': {

-                 'id': result.id,

-                 'submit_time': result.submit_time.strftime("%Y-%m-%d %H:%M:%S UTC"),

-                 'prev_outcome': prev_result.outcome if prev_result else None,

-                 'outcome': result.outcome,

-                 'log_url': result.ref_url,

-             }

-         }

- 

-         # For the v1 API

-         if hasattr(result, 'job'):

-             msg['result']['job_url'] = result.job.ref_url

- 

-         # For the v2 API

-         if hasattr(result, 'group'):

-             msg['result']['group_url'] = result.group.ref_url

- 

-         return msg

- 

  

  def load_messaging_plugin(name, kwargs):

      """ Instantiate and return the appropriate messaging plugin. """

@@ -0,0 +1,85 @@ 

+ # Copyright 2016, Red Hat, Inc.

+ #

+ # This program is free software; you can redistribute it and/or modify

+ # it under the terms of the GNU General Public License as published by

+ # the Free Software Foundation; either version 2 of the License, or

+ # (at your option) any later version.

+ #

+ # This program is distributed in the hope that it will be useful,

+ # but WITHOUT ANY WARRANTY; without even the implied warranty of

+ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the

+ # GNU General Public License for more details.

+ #

+ # You should have received a copy of the GNU General Public License along

+ # with this program; if not, write to the Free Software Foundation, Inc.,

+ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

+ 

+ import json

+ import datetime

+ import os

+ import tempfile

+ import copy

+ 

+ import resultsdb

+ from resultsdb import db

+ import resultsdb.cli

+ import resultsdb.messaging

+ from resultsdb.models.results import Result

+ 

+ 

+ class TestFuncApiV10(object):

+ 

+     def setup_method(self, method):

+         self.dbfile = tempfile.NamedTemporaryFile(delete=False)

+         self.dbfile.close()

+         resultsdb.app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///%s' % self.dbfile.name

+         resultsdb.app.config['MESSAGE_BUS_PUBLISH'] = True

+         resultsdb.app.config['MESSAGE_BUS_PLUGIN'] = 'dummy'

+         resultsdb.cli.initialize_db(destructive=True)

+         self.app = resultsdb.app.test_client()

+ 

+     def teardown_method(self, method):

+         resultsdb.messaging.DummyPlugin.history = []

+         db.session.remove()

+         os.unlink(self.dbfile.name)

+ 

+     def test_create_result(self):

+         job_data = {

+             'name': 'dist.rpmlint',

+             'ref_url': 'https://taskotron.example.com/execdb/',

+         }

+         r = self.app.post('/api/v1.0/jobs', data=json.dumps(job_data), content_type='application/json')

+         assert r.status_code == 201

+         job_id = json.loads(r.data)['id']

+         job_uuid = json.loads(r.data)['uuid']

+ 

+         result_data = {

+             'job_id': job_id,

+             'outcome': 'FAILED',

+             'testcase_name': 'dist.rpmlint',

+             'summary': '78 errors, 150 warnings',

+             'result_data': {

+                 'type': ['koji_build'],

+                 'item': ['openfst-1.6.6-1.fc28'],

+             },

+             'log_url': 'https://taskotron.example.com/artifacts/',

+         }

+         r = self.app.post('/api/v1.0/results', data=json.dumps(result_data), content_type='application/json')

+         assert r.status_code == 201

+         result_id = json.loads(r.data)['id']

+ 

+         # Check that the result was stored in the database.

+         result = db.session.query(Result).get(result_id)

+         assert result.outcome == 'FAILED'

+ 

+         # Check that a message was emitted.

+         plugin = resultsdb.messaging.DummyPlugin

+         assert len(plugin.history) == 1, plugin.history

+         assert plugin.history[0]['data']['item'] == ['openfst-1.6.6-1.fc28']

+         assert plugin.history[0]['data']['type'] == ['koji_build']

+         assert plugin.history[0]['id'] == 1

+         assert plugin.history[0]['outcome'] == 'FAILED'

+         assert plugin.history[0]['groups'] == [job_uuid]

+         assert plugin.history[0]['note'] == '78 errors, 150 warnings'

+         assert plugin.history[0]['ref_url'] == 'https://taskotron.example.com/artifacts/'

+         assert plugin.history[0]['testcase']['name'] == 'dist.rpmlint'

file modified
+8 -1
@@ -845,4 +845,11 @@ 

          self.helper_create_result()

          plugin = resultsdb.messaging.DummyPlugin

          assert len(plugin.history) == 1, plugin.history

-         assert plugin.history == [{'id': 1}]

+         assert plugin.history[0]['data']['item'] == [self.ref_result_item]

+         assert plugin.history[0]['data']['type'] == [self.ref_result_type]

+         assert plugin.history[0]['id'] == 1

+         assert plugin.history[0]['outcome'] == self.ref_result_outcome

+         assert plugin.history[0]['ref_url'] == self.ref_result_ref_url

+         assert plugin.history[0]['groups'] == [self.ref_group_uuid]

+         assert plugin.history[0]['note'] == self.ref_result_note

+         assert plugin.history[0]['testcase']['name'] == self.ref_testcase_name

@@ -25,7 +25,6 @@ 

  

  import resultsdb

  import resultsdb.cli

- import resultsdb.controllers.api_v2 as apiv2

  import resultsdb.messaging

  

  
@@ -38,7 +37,7 @@ 

  

  class MyResult(object):

  

-     def __init__(self, id, testcase_name, outcome, item, item_type, arch, scenario):

+     def __init__(self, id, testcase_name, outcome, item, item_type, arch):

          self.id = id

          self.testcase_name = testcase_name

          self.outcome = outcome
@@ -46,7 +45,6 @@ 

              MyResultData('item', item),

              MyResultData('type', item_type),

              MyResultData('arch', arch),

-             MyResultData('scenario', scenario),

          ]

  

  
@@ -89,18 +87,15 @@ 

          self.ref_result_item = 'perl-Specio-0.25-1.fc26'

          self.ref_result_type = 'koji_build'

          self.ref_result_arch = 'x86_64'

-         self.ref_result_scenario = 'x86_64.efi'

          self.ref_result_data = {

              'item': self.ref_result_item,

              'type': self.ref_result_type,

              'arch': self.ref_result_arch,

-             'scenario': self.ref_result_scenario,

              'moo': ['boo', 'woof'],

          }

          self.ref_result_ref_url = 'http://example.com/testing.result'

          self.ref_result_obj = MyResult(

-             0, self.ref_testcase_name, self.ref_result_outcome, self.ref_result_item,

-             self.ref_result_type, self.ref_result_arch, self.ref_result_scenario)

+             0, self.ref_testcase_name, self.ref_result_outcome, self.ref_result_item, self.ref_result_type, self.ref_result_arch)

  

      def teardown_method(self, method):

          # Reset this for each test.
@@ -131,12 +126,12 @@ 

          return r, data

  

      def test_get_prev_result_no_results(self):

-         prev_result = apiv2.get_prev_result(self.ref_result_obj)

+         prev_result = resultsdb.messaging.get_prev_result(self.ref_result_obj)

          assert prev_result is None

  

      def test_get_prev_result_exists(self):

          self.helper_create_result()

-         prev_result = apiv2.get_prev_result(self.ref_result_obj)

+         prev_result = resultsdb.messaging.get_prev_result(self.ref_result_obj)

  

          assert prev_result.id == 1

          assert prev_result.outcome == self.ref_result_outcome
@@ -148,11 +143,9 @@ 

                  assert result_data.value == self.ref_result_type

              if result_data.key == 'arch':

                  assert result_data.value == self.ref_result_arch

-             if result_data.key == 'scenario':

-                 assert result_data.value == self.ref_result_scenario

  

          self.helper_create_result()

-         prev_result = apiv2.get_prev_result(self.ref_result_obj)

+         prev_result = resultsdb.messaging.get_prev_result(self.ref_result_obj)

  

          assert prev_result.id == 2

          assert prev_result.outcome == self.ref_result_outcome
@@ -164,14 +157,12 @@ 

                  assert result_data.value == self.ref_result_type

              if result_data.key == 'arch':

                  assert result_data.value == self.ref_result_arch

-             if result_data.key == 'scenario':

-                 assert result_data.value == self.ref_result_scenario

  

          ref_outcome = 'FAILED'

          if self.ref_result_outcome == ref_outcome:

              ref_outcome = 'PASSED'

          self.helper_create_result(outcome=ref_outcome)

-         prev_result = apiv2.get_prev_result(self.ref_result_obj)

+         prev_result = resultsdb.messaging.get_prev_result(self.ref_result_obj)

  

          assert prev_result.id == 3

          assert prev_result.outcome == ref_outcome
@@ -183,15 +174,13 @@ 

                  assert result_data.value == self.ref_result_type

              if result_data.key == 'arch':

                  assert result_data.value == self.ref_result_arch

-             if result_data.key == 'scenario':

-                 assert result_data.value == self.ref_result_scenario

  

      def test_get_prev_result_different_item(self):

          data = copy.deepcopy(self.ref_result_data)

          data['item'] = data['item'] + '.fake'

          self.helper_create_result(data=data)

  

-         prev_result = apiv2.get_prev_result(self.ref_result_obj)

+         prev_result = resultsdb.messaging.get_prev_result(self.ref_result_obj)

          assert prev_result is None

  

      def test_get_prev_result_different_type(self):
@@ -199,7 +188,7 @@ 

          data['type'] = data['type'] + '.fake'

          self.helper_create_result(data=data)

  

-         prev_result = apiv2.get_prev_result(self.ref_result_obj)

+         prev_result = resultsdb.messaging.get_prev_result(self.ref_result_obj)

          assert prev_result is None

  

      def test_get_prev_result_different_arch(self):
@@ -207,25 +196,11 @@ 

          data['arch'] = data['arch'] + '.fake'

          self.helper_create_result(data=data)

  

-         prev_result = apiv2.get_prev_result(self.ref_result_obj)

-         assert prev_result is None

- 

-     def test_get_prev_result_different_scenario(self):

-         data = copy.deepcopy(self.ref_result_data)

-         data['scenario'] = data['scenario'] + '.fake'

-         self.helper_create_result(data=data)

- 

-         prev_result = apiv2.get_prev_result(self.ref_result_obj)

+         prev_result = resultsdb.messaging.get_prev_result(self.ref_result_obj)

          assert prev_result is None

  

      def test_get_prev_result_different_testcase_name(self):

          self.helper_create_result(testcase={'name': self.ref_testcase_name + '.fake'})

  

-         prev_result = apiv2.get_prev_result(self.ref_result_obj)

+         prev_result = resultsdb.messaging.get_prev_result(self.ref_result_obj)

          assert prev_result is None

- 

-     def test_message_publication(self):

-         self.helper_create_result()

-         plugin = resultsdb.messaging.DummyPlugin

-         assert len(plugin.history) == 1, plugin.history

-         assert plugin.history == [{'id': 1}]

Fixes #92.

This changes the message structure to contain the complete result data, in the same format as the HTTP API v2.

Existing message structure and de-duplication logic is preserved in the new publish_taskotron_message() function, which is disabled by default but can be enabled for backwards compatibility by setting MESSAGE_BUS_PUBLISH_TASKOTRON=True. This function is hardcoded to only send on fedmsg and with 'taskotron' as the topic.

This turned out to be quite a yak-shave...

You may find it easier to review each commit individually. There were quite a few things I had to fix up to get all of this working.

Note that this patch series is a net increase in test coverage: previously there were two create_message() functions, neither of them covered. Now there is one and it is covered. There were previously two get_prev_results() functions of which only one was covered. Now there is only one (and it is still covered).

However the code path for producing Taskotron-compatible fedmsgs is still not covered. I have tested it by hand with fedmsg-relay and fedmsg-logger.

I would like to improve the tests further, to test sending real fedmsgs and STOMP messages, but I didn't want this PR to get even bigger. We could tackle that in #100.

Also just to clarify the migration path here:

My intention is that Fedora's deployment of resultsdb (which currently is sending to fedmsg with modname='taskotron') would be reconfigured to send the new generic messages to fedmsg with modname='resultsdb' and the Taskotron-compatible messaging would also be turned on with MESSAGE_BUS_PUBLISH_TASKOTRON=True.

In the Red Hat internal deployment we are not currently concerned about backwards compatibility in the messages and we are already using 'resultsdb' in the topic name, so we would simply continue doing that with the updated message structure.

Also note that this PR intentionally reverts the previous PR#91, PR#93, and PR#99 to restore the previous behaviour of the Taskotron-specific message publishing code.

@adamwill - this might be of interest to you

Haven't looked at it closely, but if I understand @dcallagh's description of what it does and the expected message topic correctly, SGTM.

Pull-Request has been merged by jskladan

6 years ago