#51 WIP: publish messages when decision contexts change
Merged 6 years ago by ralph. Opened 6 years ago by mjia.
mjia/greenwave fedmsg  into  master

file added
+132
@@ -0,0 +1,132 @@ 

+ # -*- coding: utf-8 -*-

+ # SPDX-License-Identifier: GPL-2.0+

+ 

+ import socket

+ 

+ hostname = socket.gethostname()

+ 

+ config = dict(

+     active=True,

+     # Set this to dev if you're hacking on fedmsg or an app.

+     # Set to stg or prod if running in the Fedora Infrastructure

+     environment="dev",

+ 

+     # Default is 0

+     high_water_mark=0,

+     io_threads=1,

+ 

+     # For the fedmsg-hub and fedmsg-relay. ##

+ 

+     # This is a status dir to keep a record of the last processed message

+     #status_directory=os.getcwd() + "/status",

+     #status_directory='/var/run/fedmsg/status',

+ 

+     # This is the URL of a datagrepper instance that we can query for backlog.

+     #datagrepper_url="https://apps.fedoraproject.org/datagrepper/raw",

+ 

+     # We almost always want the fedmsg-hub to be sending messages with zmq as

+     # opposed to amqp or stomp.  You can send with only *one* of the messaging

+     # backends: zeromq or amqp or stomp.  You cannot send with two or more at

+     # the same time.  Here, zmq is either enabled, or it is not.  If it is not,

+     # see the options below for how to configure stomp or amqp.

+     zmq_enabled=True,

+ 

+     # On the other hand, if you wanted to use STOMP *instead* of zeromq, you

+     # could do the following...

+     #zmq_enabled=False,

+     #stomp_uri='localhost:59597,localhost:59598',

+     #stomp_user='username',

+     #stomp_pass='password',

+     #stomp_ssl_crt='/path/to/an/optional.crt',

+     #stomp_ssl_key='/path/to/an/optional.key',

+ 

+     # When subscribing to messages, we want to allow splats ('*') so we tell

+     # the hub to not be strict when comparing messages topics to subscription

+     # topics.

+     zmq_strict=False,

+ 

+     # Number of seconds to sleep after initializing waiting for sockets to sync

+     post_init_sleep=0.5,

+ 

+     # Wait a whole second to kill all the last io threads for messages to

+     # exit our outgoing queue (if we have any).  This is in milliseconds.

+     zmq_linger=1000,

+ 

+     # See the following

+     #   - http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/overview.html

+     #   - http://api.zeromq.org/3-2:zmq-setsockopt

+     zmq_tcp_keepalive=1,

+     zmq_tcp_keepalive_cnt=3,

+     zmq_tcp_keepalive_idle=60,

+     zmq_tcp_keepalive_intvl=5,

+ 

+     # Number of miliseconds that zeromq will wait to reconnect until it gets

+     # a connection if an endpoint is unavailable.

+     zmq_reconnect_ivl=100,

+     # Max delay that you can reconfigure to reduce reconnect storm spam. This

+     # is in miliseconds.

+     zmq_reconnect_ivl_max=1000,

+ 

+     # This is a dict of possible addresses from which fedmsg can send

+     # messages.  fedmsg.init(...) requires that a 'name' argument be passed

+     # to it which corresponds with one of the keys in this dict.

+     endpoints={

+         "greenwave.%s" % hostname: [

+             "tcp://127.0.0.1:5011",

+         ],

+         "relay_outbound": [

+             "tcp://127.0.0.1:4001",

+         ],

+     },

+     # This is the address of an active->passive relay.  It is used for the

+     # fedmsg-logger command which requires another service with a stable

+     # listening address for it to send messages to.

+     # It is also used by the git-hook, for the same reason.

+     # It is also used by the mediawiki php plugin which, due to the oddities of

+     # php, can't maintain a single passive-bind endpoint of it's own.

+     relay_inbound=[

+         "tcp://127.0.0.1:2003",

+     ],

+     sign_messages=False,

+     validate_signatures=False,

+ 

+     # Use these implementations to sign and validate messages

+     crypto_backend='x509',

+     crypto_validate_backends=['x509'],

+ 

+     ssldir="/etc/pki/fedmsg",

+     crl_location="https://fedoraproject.org/fedmsg/crl.pem",

+     crl_cache="/var/run/fedmsg/crl.pem",

+     crl_cache_expiry=10,

+ 

+     ca_cert_location="https://fedoraproject.org/fedmsg/ca.crt",

+     ca_cert_cache="/var/run/fedmsg/ca.crt",

+     ca_cert_cache_expiry=0,  # Never expires

+ 

+     certnames={

+         # In prod/stg, map hostname to the name of the cert in ssldir.

+         # Unfortunately, we can't use socket.getfqdn()

+         #"app01.stg": "app01.stg.phx2.fedoraproject.org",

+     },

+ 

+     # A mapping of fully qualified topics to a list of cert names for which

+     # a valid signature is to be considered authorized.  Messages on topics not

+     # listed here are considered automatically authorized.

+     routing_policy={

+         # Only allow announcements from production if they're signed by a

+         # certain certificate.

+         "org.fedoraproject.prod.announce.announcement": [

+             "announce-lockbox.phx2.fedoraproject.org",

+         ],

+     },

+ 

+     # Set this to True if you want messages to be dropped that aren't

+     # explicitly whitelisted in the routing_policy.

+     # When this is False, only messages that have a topic in the routing_policy

+     # but whose cert names aren't in the associated list are dropped; messages

+     # whose topics do not appear in the routing_policy are not dropped.

+     routing_nitpicky=False,

+ 

+     # Greenwave API url

+     greenwave_api_url='https://greenwave.domain.local/api/v1.0'

+ )

@@ -0,0 +1,3 @@ 

+ config = dict(

+     resultsdb_handler=True

+ )

@@ -0,0 +1,3 @@ 

+ config = dict(

+     waiverdb_handler=True

+ )

@@ -0,0 +1,124 @@ 

+ # SPDX-License-Identifier: GPL-2.0+

+ 

+ import mock

+ import json

+ 

+ from greenwave.consumers import resultsdb

+ 

+ 

+ @mock.patch('greenwave.consumers.resultsdb.fedmsg.config.load_config')

+ @mock.patch('greenwave.consumers.resultsdb.fedmsg.publish')

+ def test_consume_new_result(

+         mock_fedmsg, load_config, requests_session, greenwave_server, testdatabuilder, monkeypatch):

+     monkeypatch.setenv('TEST', 'true')

+     load_config.return_value = {'greenwave_api_url': greenwave_server.url + 'api/v1.0'}

+     nvr = testdatabuilder.unique_nvr()

+     result = testdatabuilder.create_result(item=nvr,

+                                            testcase_name='dist.rpmdeplint',

+                                            outcome='PASSED')

+     message = {

+         'topic': 'taskotron.result.new',

+         'msg': {

+             'result': {

+                 'id': result['id'],

+                 'outcome': 'PASSED'

+             },

+             'task': {

+                 'item': nvr,

+                 'type': 'koji_build',

+                 'name': 'dist.rpmdeplint'

+             }

+         }

+     }

+     hub = mock.MagicMock()

+     hub.config = {'environment': 'environment', 'topic_prefix': 'topic_prefix'}

+     handler = resultsdb.ResultsDBHandler(hub)

+     assert handler.topic == ['topic_prefix.environment.taskotron.result.new']

+     handler.consume(message)

+ 

+     # get old decision

+     data = {

+         'decision_context': 'bodhi_update_push_stable',

+         'product_version': 'fedora-26',

+         'subject': [{'item': nvr, 'type': 'koji_build'}],

+         'ignore_result': [result['id']]

+     }

+     r = requests_session.post(greenwave_server.url + 'api/v1.0/decision',

+                               headers={'Content-Type': 'application/json'},

+                               data=json.dumps(data))

+     assert r.status_code == 200

+     old_decision = r.json()

+ 

+     msg = {

+         'policies_satisified': False,

+         'decision_context': 'bodhi_update_push_stable',

+         'product_version': 'fedora-26',

+         'unsatisfied_requirements': [

+             {

+                 'testcase': 'dist.abicheck',

+                 'item': {

+                     'item': nvr,

+                     'type': 'koji_build'

+                 },

+                 'type': 'test-result-missing'

+             },

+             {

+                 'testcase': 'dist.upgradepath',

+                 'item': {

+                     'item': nvr,

+                     'type': 'koji_build'

+                 },

+                 'type': 'test-result-missing'

+             }

+         ],

+         'summary': '2 of 3 required tests not found',

+         'subject': [

+             {

+                 'item': nvr,

+                 'type': 'koji_build'

+             }

+         ],

+         'applicable_policies': ['taskotron_release_critical_tasks'],

+         'previous': old_decision,

+     }

+     mock_fedmsg.assert_called_once_with(

+         topic='greenwave.decision.update', msg=msg)

+ 

+ 

+ @mock.patch('greenwave.consumers.resultsdb.fedmsg.config.load_config')

+ @mock.patch('greenwave.consumers.resultsdb.fedmsg.publish')

+ def test_no_message_for_unchanged_decision(

+         mock_fedmsg, load_config, requests_session, greenwave_server, testdatabuilder, monkeypatch):

+     monkeypatch.setenv('TEST', 'true')

+     load_config.return_value = {'greenwave_api_url': greenwave_server.url + 'api/v1.0'}

+     nvr = testdatabuilder.unique_nvr()

+     testdatabuilder.create_result(item=nvr,

+                                   testcase_name='dist.rpmdeplint',

+                                   outcome='PASSED')

+     # create another new result for dist.rpmdeplint which passed again.

+     new_result = testdatabuilder.create_result(

+         item=nvr,

+         testcase_name='dist.rpmdeplint',

+         outcome='PASSED')

+     message = {

+         'topic': 'taskotron.result.new',

+         'msg': {

+             'result': {

+                 'id': new_result['id'],

+                 'outcome': 'PASSED'

+             },

+             'task': {

+                 'item': nvr,

+                 'type': 'koji_build',

+                 'name': 'dist.rpmdeplint'

+             }

+         }

+     }

+     hub = mock.MagicMock()

+     hub.config = {'environment': 'environment', 'topic_prefix': 'topic_prefix'}

+     handler = resultsdb.ResultsDBHandler(hub)

+     assert handler.topic == ['topic_prefix.environment.taskotron.result.new']

+     handler.consume(message)

+     # No message should be published as the decision is unchanged since we

+     # are still missing the required tests.

+     mock_fedmsg.assert_not_called()

@@ -0,0 +1,79 @@ 

+ # SPDX-License-Identifier: GPL-2.0+

+ 

+ import mock

+ import json

+ 

+ from greenwave.consumers import waiverdb

+ 

+ 

+ TASKTRON_RELEASE_CRITICAL_TASKS = [

+     'dist.abicheck',

+     'dist.rpmdeplint',

+     'dist.upgradepath',

+ ]

+ 

+ 

+ @mock.patch('greenwave.consumers.resultsdb.fedmsg.config.load_config')

+ @mock.patch('greenwave.consumers.waiverdb.fedmsg.publish')

+ def test_consume_new_waiver(

+         mock_fedmsg, load_config, requests_session, greenwave_server, testdatabuilder, monkeypatch):

+     monkeypatch.setenv('TEST', 'true')

+     load_config.return_value = {'greenwave_api_url': greenwave_server.url + 'api/v1.0'}

+     nvr = testdatabuilder.unique_nvr()

+     result = testdatabuilder.create_result(item=nvr,

+                                            testcase_name='dist.abicheck',

+                                            outcome='FAILED')

+     # The rest passed

+     for testcase_name in TASKTRON_RELEASE_CRITICAL_TASKS[1:]:

+         testdatabuilder.create_result(item=nvr,

+                                       testcase_name=testcase_name,

+                                       outcome='PASSED')

+     waiver = testdatabuilder.create_waiver(result_id=result['id'], product_version='fedora-26')

+     message = {

+         'topic': 'waiver.new',

+         "msg": {

+             "id": waiver['id'],

+             "comment": "Because I said so",

+             "username": "foo",

+             "waived": "true",

+             "timestamp": "2017-08-10T17:42:04.209638",

+             "product_version": "fedora-26",

+             "result_id": result['id'],

+         }

+     }

+     hub = mock.MagicMock()

+     hub.config = {'environment': 'environment', 'topic_prefix': 'topic_prefix'}

+     handler = waiverdb.WaiverDBHandler(hub)

+     assert handler.topic == ['topic_prefix.environment.waiver.new']

+     handler.consume(message)

+ 

+     # get old decision

+     data = {

+         'decision_context': 'bodhi_update_push_stable',

+         'product_version': 'fedora-26',

+         'subject': [{'item': [nvr], 'type': ['koji_build']}],

+         'ignore_waiver': [waiver['id']]

+     }

+     r = requests_session.post(greenwave_server.url + 'api/v1.0/decision',

+                               headers={'Content-Type': 'application/json'},

+                               data=json.dumps(data))

+     assert r.status_code == 200

+     old_decision = r.json()

+ 

+     msg = {

+         'policies_satisified': True,

+         'decision_context': 'bodhi_update_push_stable',

+         'unsatisfied_requirements': [],

+         'summary': 'all required tests passed',

+         'product_version': 'fedora-26',

+         'subject': [

+             {

+                 'item': [nvr],

+                 'type': ['koji_build']

+             }

+         ],

+         'applicable_policies': ['taskotron_release_critical_tasks'],

+         'previous': old_decision,

+     }

+     mock_fedmsg.assert_called_once_with(

+         topic='greenwave.decision.update', msg=msg)

@@ -367,3 +367,93 @@ 

          },

      ]

      assert res_data['unsatisfied_requirements'] == expected_unsatisfied_requirements

+ 

+ 

+ def test_ignore_result(requests_session, greenwave_server, testdatabuilder):

+     """

+     This tests that a result can be ignored when making the decision.

+     """

+     nvr = testdatabuilder.unique_nvr()

+     result = testdatabuilder.create_result(

+         item=nvr,

+         testcase_name=TASKTRON_RELEASE_CRITICAL_TASKS[0],

+         outcome='PASSED')

+     for testcase_name in TASKTRON_RELEASE_CRITICAL_TASKS[1:]:

+         testdatabuilder.create_result(item=nvr,

+                                       testcase_name=testcase_name,

+                                       outcome='PASSED')

+     data = {

+         'decision_context': 'bodhi_update_push_stable',

+         'product_version': 'fedora-26',

+         'subject': [{'item': nvr, 'type': 'koji_build'}]

+     }

+     r = requests_session.post(greenwave_server.url + 'api/v1.0/decision',

+                               headers={'Content-Type': 'application/json'},

+                               data=json.dumps(data))

+     assert r.status_code == 200

+     res_data = r.json()

+     assert res_data['policies_satisified'] is True

+     # Ignore one passing result

+     data.update({

+         'ignore_result': [result['id']]

+     })

+     r = requests_session.post(greenwave_server.url + 'api/v1.0/decision',

+                               headers={'Content-Type': 'application/json'},

+                               data=json.dumps(data))

+     expected_unsatisfied_requirements = [

+         {

+             'item': {'item': nvr, 'type': 'koji_build'},

+             'testcase': TASKTRON_RELEASE_CRITICAL_TASKS[0],

+             'type': 'test-result-missing'

+         },

+     ]

+     assert r.status_code == 200

+     res_data = r.json()

+     assert res_data['policies_satisified'] is False

+     assert res_data['unsatisfied_requirements'] == expected_unsatisfied_requirements

+ 

+ 

+ def test_ignore_waiver(requests_session, greenwave_server, testdatabuilder):

+     """

+     This tests that a waiver can be ignored when making the decision.

+     """

+     nvr = testdatabuilder.unique_nvr()

+     result = testdatabuilder.create_result(item=nvr,

+                                            testcase_name=all_rpmdiff_testcase_names[0],

+                                            outcome='FAILED')

+     waiver = testdatabuilder.create_waiver(result_id=result['id'], product_version='rhel-7')

+     # The rest passed

+     for testcase_name in all_rpmdiff_testcase_names[1:]:

+         testdatabuilder.create_result(item=nvr,

+                                       testcase_name=testcase_name,

+                                       outcome='PASSED')

+     data = {

+         'decision_context': 'errata_newfile_to_qe',

+         'product_version': 'rhel-7',

+         'subject': [{'item': nvr, 'type': 'koji_build'}]

+     }

+     r = requests_session.post(greenwave_server.url + 'api/v1.0/decision',

+                               headers={'Content-Type': 'application/json'},

+                               data=json.dumps(data))

+     assert r.status_code == 200

+     res_data = r.json()

+     assert res_data['policies_satisified'] is True

+     # Ignore the waiver

+     data.update({

+         'ignore_waiver': [waiver['id']]

+     })

+     r = requests_session.post(greenwave_server.url + 'api/v1.0/decision',

+                               headers={'Content-Type': 'application/json'},

+                               data=json.dumps(data))

+     assert r.status_code == 200

+     res_data = r.json()

+     expected_unsatisfied_requirements = [

+         {

+             'item': {'item': nvr, 'type': 'koji_build'},

+             'result_id': result['id'],

+             'testcase': all_rpmdiff_testcase_names[0],

+             'type': 'test-result-failed'

+         },

+     ]

+     assert res_data['policies_satisified'] is False

+     assert res_data['unsatisfied_requirements'] == expected_unsatisfied_requirements

file modified
+13 -4
@@ -159,6 +159,10 @@ 

      :jsonparam list subject: A list of items about which the caller is requesting a decision

          used for querying ResultsDB. Each item contains one or more key-value pairs of 'data' key

          in ResultsDB API. For example, [{"type": "koji_build", "item": "xscreensaver-5.37-3.fc27"}].

+     :jsonparam list ignore_result: A list of result ids that will be ignored when making

+         the decision.

+     :jsonparam list ignore_waiver: A list of waiver ids that will be ignored when making

+         the decision.

      :statuscode 200: A decision was made.

      :statuscode 400: Invalid data was given.

      """
@@ -174,26 +178,31 @@ 

              raise BadRequest('Missing required subject')

      else:

          raise UnsupportedMediaType('No JSON payload in request')

-     if not isinstance(request.get_json()['subject'], list):

+     data = request.get_json()

+     if not isinstance(data['subject'], list):

          raise BadRequest('Invalid subject, must be a list of items')

-     product_version = request.get_json()['product_version']

-     decision_context = request.get_json()['decision_context']

+     product_version = data['product_version']

+     decision_context = data['decision_context']

+     ignore_results = data.get('ignore_result', [])

+     ignore_waivers = data.get('ignore_waiver', [])

      applicable_policies = [policy for policy in current_app.config['policies']

                             if policy.applies_to(decision_context, product_version)]

      if not applicable_policies:

          raise NotFound(

              'Cannot find any applicable policies for %s and %s' % (

                  product_version, decision_context))

-     subjects = [item for item in request.get_json()['subject'] if isinstance(item, dict)]

+     subjects = [item for item in data['subject'] if isinstance(item, dict)]

      if not subjects:

          raise BadRequest('Invalid subject, must be a list of dicts')

      answers = []

      for item in subjects:

          results = retrieve_results(item)

+         results = [r for r in results if r['id'] not in ignore_results]

          if results:

              waivers = retrieve_waivers(product_version, results)

          else:

              waivers = []

+         waivers = [w for w in waivers if w['id'] not in ignore_waivers]

          for policy in applicable_policies:

              answers.extend(policy.check(item, results, waivers))

      res = {

@@ -0,0 +1,3 @@ 

+ # -*- coding: utf-8 -*-

+ # SPDX-License-Identifier: GPL-2.0+

+ """Contains Greenwave's fedmsg consumers."""

@@ -0,0 +1,105 @@ 

+ # -*- coding: utf-8 -*-

+ # SPDX-License-Identifier: GPL-2.0+

+ """

+ The "resultsdb handler".

+ 

+ This module is responsible for listening new results from ResultsDB. When a new

+ result is received, Greenwave will check all applicable policies for that item,

+ and if the new result causes the decision to change it will publish a message

+ to the message bus about the newly satisfied/unsatisfied policy.

+ """

+ 

+ import logging

+ import requests

+ import json

+ import fedmsg.consumers

+ 

+ from greenwave.utils import load_config

+ 

+ requests_session = requests.Session()

+ 

+ 

+ log = logging.getLogger(__name__)

+ 

+ 

+ class ResultsDBHandler(fedmsg.consumers.FedmsgConsumer):

+     """

+     Handle a new result.

+ 

+     Attributes:

+         topic (list): A list of strings that indicate which fedmsg topics this consumer listens to.

+     """

+ 

+     config_key = 'resultsdb_handler'

+ 

+     def __init__(self, hub, *args, **kwargs):

+         """

+         Initialize the ResultsDBHandler, subscribing it to the appropriate topics.

+ 

+         Args:

+             hub (moksha.hub.hub.CentralMokshaHub): The hub from which this handler is consuming

+                 messages. It is used to look up the hub config.

+         """

+ 

+         prefix = hub.config.get('topic_prefix')

+         env = hub.config.get('environment')

+         self.topic = [

+             prefix + '.' + env + '.taskotron.result.new',

+         ]

+         self.fedmsg_config = fedmsg.config.load_config()

+         super(ResultsDBHandler, self).__init__(hub, *args, **kwargs)

+         log.info('Greenwave resultsdb handler listening on:\n'

+                  '%s' % self.topic)

+ 

+     def consume(self, message):

+         """

+         Process the given message and publish a message if the decision is changed.

+ 

+         Args:

+             message (munch.Munch): A fedmsg about a new result.

+         """

+         log.debug('Processing message "{0}"'.format(message))

+         msg = message['msg']

+         task = msg['task']

+         testcase = task['name']

+         del task['name']

+         config = load_config()

+         applicable_policies = []

+         for policy in config['policies']:

+             for rule in policy.rules:

+                 if rule.test_case_name == testcase:
ralph commented 6 years ago

Interesting. This assumes that all of our rules will have a test_case_name, which they currently do. But will they always? We could look at abstracting this interface later. :+1: to keep it for now.

mjia commented 6 years ago

Yeah, we can always adjust this if we need to introduce some rules that do not have a test_case_name.

+                     applicable_policies.append(policy)

+         for policy in applicable_policies:

+             for product_version in policy.product_versions:

+                 data = {

+                     'decision_context': policy.decision_context,

+                     'product_version': product_version,

+                     'subject': [task],

+                 }

+                 response = requests_session.post(

+                     self.fedmsg_config['greenwave_api_url'] + '/decision',

+                     headers={'Content-Type': 'application/json'},

+                     data=json.dumps(data))

+                 response.raise_for_status()

+                 decision = response.json()

+                 # get old decision

+                 data.update({

+                     'ignore_result': [msg['result']['id']],

+                 })

+                 response = requests_session.post(

+                     self.fedmsg_config['greenwave_api_url'] + '/decision',

+                     headers={'Content-Type': 'application/json'},

+                     data=json.dumps(data))

+                 response.raise_for_status()

+                 old_decision = response.json()

+                 if decision != old_decision:

+                     msg = decision

+                     decision.update({

+                         'subject': [task],

+                         'decision_context': policy.decision_context,

+                         'product_version': product_version,

+                         'previous': old_decision,

+                     })

+                     log.debug('Emitted a fedmsg, %r, on the "%s" topic', msg,

+                               'greenwave.decision.update')

+                     fedmsg.publish(topic='greenwave.decision.update', msg=msg)

@@ -0,0 +1,110 @@ 

+ # -*- coding: utf-8 -*-

+ # SPDX-License-Identifier: GPL-2.0+

+ """

+ The "waiverdb handler".

+ 

+ This module is responsible for listening new waivers from WaiverDB. When a new

+ waiver is received, Greenwave will check all applicable policies for that waiver,

+ and if the new waiver causes the decision to change it will publish a message

+ to the message bus about the newly satisfied/unsatisfied policy.

+ """

+ 

+ import logging

+ import requests

+ import json

+ import fedmsg.consumers

+ 

+ from greenwave.utils import load_config

+ 

+ requests_session = requests.Session()

+ 

+ 

+ log = logging.getLogger(__name__)

+ 

+ 

+ class WaiverDBHandler(fedmsg.consumers.FedmsgConsumer):

+     """

+     Handle a new waiver.

+ 

+     Attributes:

+         topic (list): A list of strings that indicate which fedmsg topics this consumer listens to.

+     """

+ 

+     config_key = 'waiverdb_handler'

+ 

+     def __init__(self, hub, *args, **kwargs):

+         """

+         Initialize the WaiverDBHandler, subscribing it to the appropriate topics.

+ 

+         Args:

+             hub (moksha.hub.hub.CentralMokshaHub): The hub from which this handler is consuming

+                 messages. It is used to look up the hub config.

+         """

+ 

+         prefix = hub.config.get('topic_prefix')

+         env = hub.config.get('environment')

+         self.topic = [

+             prefix + '.' + env + '.waiver.new',

+         ]

+         self.fedmsg_config = fedmsg.config.load_config()

+ 

+         super(WaiverDBHandler, self).__init__(hub, *args, **kwargs)

+         log.info('Greenwave waiverdb handler listening on:\n'

+                  '%s' % self.topic)

+ 

+     def consume(self, message):

+         """

+         Process the given message and publish a message if the decision is changed.

+ 

+         Args:

+             message (munch.Munch): A fedmsg about a new waiver.

+         """

+         log.debug('Processing message "{0}"'.format(message))

+         msg = message['msg']

+         result_id = msg['result_id']

+         product_version = msg['product_version']

+         config = load_config()

+         timeout = config['REQUESTS_TIMEOUT']

+         # Get the waived result to figure out the item

+         response = requests_session.get(

+             config['RESULTSDB_API_URL'] + '/results/%d' % result_id,

+             timeout=timeout)

+         response.raise_for_status()

+         testcase = response.json()['testcase']['name']

+         item = response.json()['data']

+         for policy in config['policies']:

+             for rule in policy.rules:

+                 if rule.test_case_name == testcase:

+                     data = {

+                         'decision_context': policy.decision_context,

+                         'product_version': product_version,

+                         'subject': [item],

+                     }

+                     response = requests_session.post(

+                         self.fedmsg_config['greenwave_api_url'] + '/decision',

+                         headers={'Content-Type': 'application/json'},

+                         data=json.dumps(data))

+                     decision = response.json()

+ 

+                     # get old decision

+                     data.update({

+                         'ignore_waiver': [msg['id']],

+                     })

+                     response = requests_session.post(

+                         self.fedmsg_config['greenwave_api_url'] + '/decision',

+                         headers={'Content-Type': 'application/json'},

+                         data=json.dumps(data))

+                     response.raise_for_status()

+                     old_decision = response.json()

+ 

+                     if decision != old_decision:

+                         msg = decision

+                         decision.update({

+                             'subject': [item],

+                             'decision_context': policy.decision_context,

+                             'product_version': product_version,

+                             'previous': old_decision,

+                         })

+                         log.debug('Emitted a fedmsg, %r, on the "%s" topic', msg,

+                                   'greenwave.decision.update')

+                         fedmsg.publish(topic='greenwave.decision.update', msg=msg)

file modified
+1
@@ -2,3 +2,4 @@ 

  requests

  PyYAML

  dogpile.cache

+ fedmsg

no initial comment

I'm not quite sure about the message syntax. Currently, I'm using a syntax like this:

{ 'policies_satisified': False, 'decision_context': 'bodhi_update_push_stable', 'unsatisfied_requirements': [ { 'testcase': 'dist.abicheck', 'item': { 'item': nvr, 'type': 'koji_build' }, 'type': 'test-result-missing' }, { 'testcase': 'dist.upgradepath', 'item': { 'item': nvr, 'type': 'koji_build' }, 'type': 'test-result-missing' } ], 'summary': '2 of 3 required tests not found', 'product_version': 'fedora-26', 'subject': [ { 'item': nvr, 'type': 'koji_build' } ], 'applicable_policies': ['taskotron_release_critical_tasks'] }

This looks good to me.

Do these only get published when the satisfaction changes from False to True or from True to False? Or do they get published for every waiver and every result?

Maybe it could be interesting to have a msg that contains the old decision and the new decision so that a listener could act on the delta, somehow.

Yeah, I'm going to do it in another patch. In this patch, they get published for every result and every waiver. What should the best syntax be for containing the old decision and the new decision? I guess we can use a new key 'pre_decsion'?

Sure. I'd vote for just using previous instead of pre_decision.

Damn it! I've forgotten to commit a directory including the consumers in this patch. The worst thing is I can't find it from my local git repo. So please do not review and I will post a new update when I find time.

Sorry, I just come back from PTO. I will start reworking on this PR.

rebased onto 232c7cd548f896b599fde7257353f568861fec08

6 years ago

Okay, I have managed to bring these missing consumers back and this PR is ready now for another round.

As I understand, if the results and waivers are cached as described in PR#84, I should be able to use these cached results and waivers to get the old decision. Correct me if I am wrong, :-P

Interesting. I see this is re-implementing the logic of the /decision endpoint.

What do you think about having this make an actual HTTP POST to the greenwave web interface? That way, we could keep only one stretch of code that handles computing a decision.

In general, this looks like a good start. A few things:

  • Consider querying the HTTP interface of greenwave to get the decision instead of making the decision in the code here. Maybe that's too indirect.. but it could be nice to try and de-duplicate the decision logic.
  • We'll have to figure out how to compute the "old decision".
  • In a world where this code queries the greenwave web interface to get the decision, I imagine it making two queries. One, normal decision query, and a second decision query that specifies additionally that the decision should ignore the given result_id or waiver_id.
  • Once the backend has both the old decision and the new decision (however it gets those, via its own computation, or via a function that it shares with the frontend, or via an http post call..), then it can compare the old decision and the new decision. I think it should only publish a fedmsg message if the old message is different from the new message. I.e., if there's a change.

Yeah, you are right. Somehow I was trying to avoid sending indirect requests.

@ralph, thank you very much for your review. I like the idea of how to compute the "old version".

rebased onto 2a580b3

6 years ago

Rebased to address the feedback and it is ready for another look, :-P

Interesting. This assumes that all of our rules will have a test_case_name, which they currently do. But will they always? We could look at abstracting this interface later. :+1: to keep it for now.

This looks awesome.

Running the tests now.

32 passed, 3 warnings in 5.97 seconds

Looks good! Merging.

We'll need to figure out how we actually deploy this in openshift (it is effectively optional, fwiw. the frontend continues to work without it.)

We'll need some kind of side container for the messaging daemon.

Pull-Request has been merged by ralph

6 years ago

Yeah, we can always adjust this if we need to introduce some rules that do not have a test_case_name.