#392 Start porting to fedora-messaging
Merged 5 years ago by mprahl. Opened 5 years ago by pingou.
pingou/greenwave fedora_messaging  into  master

file added
+67
@@ -0,0 +1,67 @@ 

+ # A sample configuration for fedora-messaging. This file is in the TOML format.

+ # For complete details on all configuration options, see the documentation

+ # https://fedora-messaging.readthedocs.io/en/latest/configuration.html.

+ 

+ amqp_url = "amqp://"

+ 

+ publish_exchange = "amq.topic"

+ 

+ callback = "greenwave.consumers.fedora_messaging_consumer:fedora_messaging_callback"

+ 

+ # Note the double brackets below.

+ # To add another binding, add another [[bindings]] section.

+ [[bindings]]

+ queue = "greenwave"

+ exchange = "amq.topic"

+ routing_keys = [

+     "org.fedoraproject.prod.taskotron.result.new",

+     "org.fedoraproject.stg.taskotron.result.new",

+     "org.fedoraproject.prod.waiver.new",

+     "org.fedoraproject.stg.waiver.new",

+ ]

+ 

+ [tls]

+ ca_cert = "/etc/pki/tls/certs/ca-bundle.crt"

+ keyfile = "/my/client/key.pem"

+ certfile = "/my/client/cert.pem"

+ 

+ [client_properties]

+ app = "greenwave"

+ 

+ [queues.greenwave]

+ durable = true

+ auto_delete = false

+ exclusive = false

+ arguments = {}

+ 

+ [qos]

+ prefetch_size = 0

+ prefetch_count = 25

+ 

+ [log_config]

+ version = 1

+ disable_existing_loggers = true

+ 

+ [log_config.formatters.simple]

+ format = "[%(name)s %(levelname)s] %(message)s"

+ 

+ [log_config.handlers.console]

+ class = "logging.StreamHandler"

+ formatter = "simple"

+ stream = "ext://sys.stdout"

+ 

+ [log_config.loggers.fedora_messaging]

+ level = "INFO"

+ propagate = false

+ handlers = ["console"]

+ 

+ [log_config.root]

+ level = "WARNING"

+ handlers = ["console"]

+ 

+ # greenwave consumer configuration

+ [consumer_config]

+ topic_prefix = 'org.fedoraproject'

+ environment = 'dev'

+ waiverdb_topic_suffix = 'waiver.new'

+ resultsdb_topic_suffix = 'taskotron.result.new'

file modified
+2
@@ -31,6 +31,8 @@ 

  

      POLICIES_DIR = '/etc/greenwave/policies'

  

+     MESSAGING = 'fedmsg'

+ 

      # By default, don't cache anything.

      CACHE = {'backend': 'dogpile.cache.null'}

  

@@ -0,0 +1,69 @@ 

+ # -*- coding: utf-8 -*-

+ # SPDX-License-Identifier: GPL-2.0+

+ """

+ The fedora-messaging consumer.

+ 

+ This module is responsible consuming messages sent to the fedora message bus via

+ fedora-messaging.

+ It will get all the messages and pass them onto their appropriate fedmsg

+ consumers to re-use the same code path.

+ """

+ 

+ import logging

+ 

+ from greenwave.consumers.resultsdb import ResultsDBHandler

+ from greenwave.consumers.waiverdb import WaiverDBHandler

+ 

+ from fedora_messaging.config import conf

+ 

+ 

+ log = logging.getLogger(__name__)

+ 

+ 

+ class Dummy(object):

+     """ Dummy object only storing a dictionary named "config" that can be passed

+     onto the fedmsg consumer.

+     """

+ 

+     def __init__(self, config):

+         self.config = config

+ 

+ 

+ def fedora_messaging_callback(message):

+     """

+     Callback called when messages from fedora-messaging are received.

+     It then passes them onto their appropriate fedmsg handler for code

+     portability.

+ 

+     Args:

+         message (fedora_messaging.message.Message): The message we received

+             from the queue.

+     """

+     log.info(

+         'Received message from fedora-messaging with topic: %s', message.topic)

+     consumer_config = conf["consumer_config"]

+     if message.topic.endswith("taskotron.result.new"):

+         # New resultsdb results

+         config = {

+             "topic_prefix": consumer_config["topic_prefix"],

+             "environment": consumer_config["environment"],

+             "resultsdb_topic_suffix": consumer_config["resultsdb_topic_suffix"]

+         }

+         hub = Dummy(config)

+         handler = ResultsDBHandler(hub)

+         msg = {"body": {'msg': message.body}}

+         log.info('Sending message received to: ResultsDBHandler')

+         handler.consume(msg)

+ 

+     elif message.topic.endswith('waiver.new'):

+         # New waiver submitted

+         config = {

+             "topic_prefix": consumer_config["topic_prefix"],

+             "environment": consumer_config["environment"],

+             "waiverdb_topic_suffix": consumer_config["waiverdb_topic_suffix"]

+         }

+         hub = Dummy(config)

+         handler = WaiverDBHandler(hub)

+         msg = {"body": {'msg': message.body}}

+         log.info('Sending message received to: WaiverDBHandler')

+         handler.consume(msg)

@@ -24,6 +24,12 @@ 

  

  import xmlrpc.client

  

+ try:

+     import fedora_messaging.api

+     import fedora_messaging.exceptions

+ except ImportError:

+     pass

+ 

  

  log = logging.getLogger(__name__)

  
@@ -264,6 +270,8 @@ 

              testcase=testcase,

              product_version=product_version)

  

+         log.info('Getting greenwave info')

+ 

          for decision_context, product_version in sorted(contexts_product_versions):

              greenwave_url = self.fedmsg_config['greenwave_api_url'] + '/decision'

  
@@ -275,6 +283,7 @@ 

              }

  

              try:

+                 log.debug('querying greenwave at: %s', greenwave_url)

                  decision = greenwave.resources.retrieve_decision(greenwave_url, data)

  

                  # get old decision
@@ -282,6 +291,7 @@ 

                      'ignore_result': [result_id],

                  })

                  old_decision = greenwave.resources.retrieve_decision(greenwave_url, data)

+                 log.debug('old decision: %s', old_decision)

              except requests.exceptions.HTTPError as e:

                  log.exception('Failed to retrieve decision for data=%s, error: %s', data, e)

                  continue
@@ -299,6 +309,25 @@ 

                      'product_version': product_version,

                      'previous': old_decision,

                  })

-                 log.debug('Emitted a fedmsg, %r, on the "%s" topic', decision,

-                           'greenwave.decision.update')

-                 fedmsg.publish(topic='decision.update', msg=decision)

+                 log.info(

+                     'Emitted a message on the bus, %r, with the topic '

+                     '"greenwave.decision.update"', decision)

+                 if self.flask_app.config['MESSAGING'] == 'fedmsg':

+                     log.debug('  - to fedmsg')

+                     fedmsg.publish(topic='decision.update', msg=decision)

+                 elif self.flask_app.config['MESSAGING'] == 'fedora-messaging':

+                     log.debug('  - to fedora-messaging')

+                     try:

+                         msg = fedora_messaging.api.Message(

+                             topic='greenwave.decision.update',

+                             body=decision

+                         )

+                         fedora_messaging.api.publish(msg)

+                     except fedora_messaging.exceptions.PublishReturned as e:

+                         log.warning(

+                             'Fedora Messaging broker rejected message %s: %s',

+                             msg.id, e)

+                     except fedora_messaging.exceptions.ConnectionException as e:

+                         log.warning('Error sending message %s: %s', msg.id, e)

+                     except Exception:  # pylint: disable=broad-except

+                         log.exception('Error sending fedora-messaging message')

@@ -20,6 +20,13 @@ 

  from greenwave.monitoring import publish_decision_exceptions_waiver_counter

  from greenwave.policies import applicable_decision_context_product_version_pairs

  

+ try:

+     import fedora_messaging.api

+     import fedora_messaging.exceptions

+ except ImportError:

+     pass

+ 

+ 

  requests_session = requests.Session()

  

  
@@ -135,6 +142,25 @@ 

                      'product_version': product_version,

                      'previous': old_decision,

                  })

-                 log.debug('Emitted a fedmsg, %r, on the "%s" topic', msg,

-                           'greenwave.decision.update')

-                 fedmsg.publish(topic='decision.update', msg=msg)

+                 log.info(

+                     'Emitted a message on the bus, %r, with the topic '

+                     '"greenwave.decision.update"', decision)

+                 if self.flask_app.config['MESSAGING'] == 'fedmsg':

+                     log.debug('  - to fedmsg')

+                     fedmsg.publish(topic='decision.update', msg=msg)

+                 elif self.flask_app.config['MESSAGING'] == 'fedora-message':

+                     log.debug('  - to fedora-messaging')

+                     try:

+                         msg = fedora_messaging.api.Message(

+                             topic='greenwave.decision.update',

+                             body=msg

+                         )

+                         fedora_messaging.api.publish(msg)

+                     except fedora_messaging.exceptions.PublishReturned as e:

+                         log.warning(

+                             'Fedora Messaging broker rejected message %s: %s',

+                             msg.id, e)

+                     except fedora_messaging.exceptions.ConnectionException as e:

+                         log.warning('Error sending message %s: %s', msg.id, e)

+                     except Exception:  # pylint: disable=broad-except

+                         log.exception('Error sending fedora-messaging message')

@@ -1,6 +1,7 @@ 

  # SPDX-License-Identifier: GPL-2.0+

  

  import mock

+ import pytest

  

  from textwrap import dedent

  
@@ -109,185 +110,201 @@ 

      assert subjects == []

  

  

+ parameters = [

+     ('fedmsg', 'greenwave.consumers.resultsdb.fedmsg.publish'),

+     ('fedora-messaging', 'greenwave.consumers.resultsdb.fedora_messaging.api.publish'),

+ ]

+ 

+ @pytest.mark.parametrize("config,publish", parameters)

  @mock.patch('greenwave.resources.ResultsRetriever.retrieve')

  @mock.patch('greenwave.resources.retrieve_decision')

  @mock.patch('greenwave.resources.retrieve_scm_from_koji')

  @mock.patch('greenwave.resources.retrieve_yaml_remote_rule')

- @mock.patch('greenwave.consumers.resultsdb.fedmsg.publish')

  def test_remote_rule_decision_change(

-         mock_fedmsg,

          mock_retrieve_yaml_remote_rule,

          mock_retrieve_scm_from_koji,

          mock_retrieve_decision,

-         mock_retrieve_results):

+         mock_retrieve_results,

+         config,

+         publish):

      """

      Test publishing decision change message for test cases mentioned in

      gating.yaml.

      """

-     # gating.yaml

-     gating_yaml = dedent("""

-         --- !Policy

-         product_versions: [fedora-rawhide, notexisting_prodversion]

-         decision_context: test_context

-         rules:

-           - !PassingTestCaseRule {test_case_name: dist.rpmdeplint}

-     """)

-     mock_retrieve_yaml_remote_rule.return_value = gating_yaml

- 

-     policies = dedent("""

-         --- !Policy

-         id: test_policy

-         product_versions: [fedora-rawhide]

-         decision_context: test_context

-         subject_type: koji_build

-         rules:

-           - !RemoteRule {}

-     """)

- 

-     nvr = 'nethack-1.2.3-1.rawhide'

-     result = {

-         'id': 1,

-         'testcase': {'name': 'dist.rpmdeplint'},

-         'outcome': 'PASSED',

-         'data': {'item': nvr, 'type': 'koji_build'},

-     }

-     mock_retrieve_results.return_value = [result]

- 

-     def retrieve_decision(url, data):

-         #pylint: disable=unused-argument

-         if 'ignore_result' in data:

-             return None

-         return {}

-     mock_retrieve_decision.side_effect = retrieve_decision

-     mock_retrieve_scm_from_koji.return_value = ('rpms', nvr,

-                                                 'c3c47a08a66451cb9686c49f040776ed35a0d1bb')

- 

-     message = {

-         'body': {

-             'topic': 'resultsdb.result.new',

-             'msg': {

-                 'id': result['id'],

+     with mock.patch('greenwave.config.Config.MESSAGING', config):

+         with mock.patch(publish) as mock_fedmsg:

+             # gating.yaml

+             gating_yaml = dedent("""

+                 --- !Policy

+                 product_versions: [fedora-rawhide, notexisting_prodversion]

+                 decision_context: test_context

+                 rules:

+                   - !PassingTestCaseRule {test_case_name: dist.rpmdeplint}

+             """)

+             mock_retrieve_yaml_remote_rule.return_value = gating_yaml

+ 

+             policies = dedent("""

+                 --- !Policy

+                 id: test_policy

+                 product_versions: [fedora-rawhide]

+                 decision_context: test_context

+                 subject_type: koji_build

+                 rules:

+                   - !RemoteRule {}

+             """)

+ 

+             nvr = 'nethack-1.2.3-1.rawhide'

+             result = {

+                 'id': 1,

+                 'testcase': {'name': 'dist.rpmdeplint'},

                  'outcome': 'PASSED',

-                 'testcase': {

-                     'name': 'dist.rpmdeplint',

-                 },

-                 'data': {

-                     'item': [nvr],

-                     'type': ['koji_build'],

+                 'data': {'item': nvr, 'type': 'koji_build'},

+             }

+             mock_retrieve_results.return_value = [result]

+ 

+             def retrieve_decision(url, data):

+                 #pylint: disable=unused-argument

+                 if 'ignore_result' in data:

+                     return None

+                 return {}

+             mock_retrieve_decision.side_effect = retrieve_decision

+             mock_retrieve_scm_from_koji.return_value = ('rpms', nvr,

+                                                         'c3c47a08a66451cb9686c49f040776ed35a0d1bb')

+ 

+             message = {

+                 'body': {

+                     'topic': 'resultsdb.result.new',

+                     'msg': {

+                         'id': result['id'],

+                         'outcome': 'PASSED',

+                         'testcase': {

+                             'name': 'dist.rpmdeplint',

+                         },

+                         'data': {

+                             'item': [nvr],

+                             'type': ['koji_build'],

+                         }

+                     }

                  }

              }

-         }

-     }

-     hub = mock.MagicMock()

-     hub.config = {

-         'environment': 'environment',

-         'topic_prefix': 'topic_prefix',

-     }

-     handler = greenwave.consumers.resultsdb.ResultsDBHandler(hub)

- 

-     handler.flask_app.config['policies'] = Policy.safe_load_all(policies)

-     with handler.flask_app.app_context():

-         handler.consume(message)

- 

-     assert len(mock_fedmsg.mock_calls) == 1

- 

-     mock_call = mock_fedmsg.mock_calls[0][2]

-     assert mock_call['topic'] == 'decision.update'

- 

-     actual_msgs_sent = [mock_call['msg'] for call in mock_fedmsg.mock_calls]

-     assert actual_msgs_sent[0] == {

-         'decision_context': 'test_context',

-         'product_version': 'fedora-rawhide',

-         'subject': [

-             {'item': nvr, 'type': 'koji_build'},

-         ],

-         'subject_type': 'koji_build',

-         'subject_identifier': nvr,

-         'previous': None,

-     }

+             hub = mock.MagicMock()

+             hub.config = {

+                 'environment': 'environment',

+                 'topic_prefix': 'topic_prefix',

+             }

+             handler = greenwave.consumers.resultsdb.ResultsDBHandler(hub)

+ 

+             handler.flask_app.config['policies'] = Policy.safe_load_all(policies)

+             with handler.flask_app.app_context():

+                 handler.consume(message)

+ 

+             assert len(mock_fedmsg.mock_calls) == 1

+ 

+             if config == "fedmsg":

+                 mock_call = mock_fedmsg.mock_calls[0][2]

+                 assert mock_call['topic'] == 'decision.update'

+                 actual_msgs_sent = mock_call['msg']

+             else:

+                 mock_call = mock_fedmsg.mock_calls[0][1][0]

+                 assert mock_call.topic == 'greenwave.decision.update'

+                 actual_msgs_sent = mock_call.body

+ 

+             assert actual_msgs_sent == {

+                 'decision_context': 'test_context',

+                 'product_version': 'fedora-rawhide',

+                 'subject': [

+                     {'item': nvr, 'type': 'koji_build'},

+                 ],

+                 'subject_type': 'koji_build',

+                 'subject_identifier': nvr,

+                 'previous': None,

+             }

  

  

+ @pytest.mark.parametrize("config,publish", parameters)

  @mock.patch('greenwave.resources.ResultsRetriever.retrieve')

  @mock.patch('greenwave.resources.retrieve_decision')

  @mock.patch('greenwave.resources.retrieve_scm_from_koji')

  @mock.patch('greenwave.resources.retrieve_yaml_remote_rule')

- @mock.patch('greenwave.consumers.resultsdb.fedmsg.publish')

  def test_remote_rule_decision_change_not_matching(

-         mock_fedmsg,

          mock_retrieve_yaml_remote_rule,

          mock_retrieve_scm_from_koji,

          mock_retrieve_decision,

-         mock_retrieve_results):

+         mock_retrieve_results,

+         config,

+         publish):

      """

      Test publishing decision change message for test cases mentioned in

      gating.yaml.

      """

-     # gating.yaml

-     gating_yaml = dedent("""

-         --- !Policy

-         product_versions: [fedora-rawhide]

-         decision_context: test_context

-         rules:

-           - !PassingTestCaseRule {test_case_name: dist.rpmdeplint}

-     """)

-     mock_retrieve_yaml_remote_rule.return_value = gating_yaml

- 

-     policies = dedent("""

-         --- !Policy

-         id: test_policy

-         product_versions: [fedora-rawhide]

-         decision_context: another_test_context

-         subject_type: koji_build

-         rules:

-           - !RemoteRule {}

-     """)

- 

-     nvr = 'nethack-1.2.3-1.rawhide'

-     result = {

-         'id': 1,

-         'testcase': {'name': 'dist.rpmdeplint'},

-         'outcome': 'PASSED',

-         'data': {'item': nvr, 'type': 'koji_build'},

-     }

-     mock_retrieve_results.return_value = [result]

- 

-     def retrieve_decision(url, data):

-         #pylint: disable=unused-argument

-         if 'ignore_result' in data:

-             return None

-         return {}

-     mock_retrieve_decision.side_effect = retrieve_decision

-     mock_retrieve_scm_from_koji.return_value = ('rpms', nvr,

-                                                 'c3c47a08a66451cb9686c49f040776ed35a0d1bb')

- 

-     message = {

-         'body': {

-             'topic': 'resultsdb.result.new',

-             'msg': {

-                 'id': result['id'],

+     with mock.patch('greenwave.config.Config.MESSAGING', config):

+         with mock.patch(publish) as mock_fedmsg:

+             # gating.yaml

+             gating_yaml = dedent("""

+                 --- !Policy

+                 product_versions: [fedora-rawhide]

+                 decision_context: test_context

+                 rules:

+                   - !PassingTestCaseRule {test_case_name: dist.rpmdeplint}

+             """)

+             mock_retrieve_yaml_remote_rule.return_value = gating_yaml

+ 

+             policies = dedent("""

+                 --- !Policy

+                 id: test_policy

+                 product_versions: [fedora-rawhide]

+                 decision_context: another_test_context

+                 subject_type: koji_build

+                 rules:

+                   - !RemoteRule {}

+             """)

+ 

+             nvr = 'nethack-1.2.3-1.rawhide'

+             result = {

+                 'id': 1,

+                 'testcase': {'name': 'dist.rpmdeplint'},

                  'outcome': 'PASSED',

-                 'testcase': {

-                     'name': 'dist.rpmdeplint',

-                 },

-                 'data': {

-                     'item': [nvr],

-                     'type': ['koji_build'],

+                 'data': {'item': nvr, 'type': 'koji_build'},

+             }

+             mock_retrieve_results.return_value = [result]

+ 

+             def retrieve_decision(url, data):

+                 #pylint: disable=unused-argument

+                 if 'ignore_result' in data:

+                     return None

+                 return {}

+             mock_retrieve_decision.side_effect = retrieve_decision

+             mock_retrieve_scm_from_koji.return_value = ('rpms', nvr,

+                                                         'c3c47a08a66451cb9686c49f040776ed35a0d1bb')

+ 

+             message = {

+                 'body': {

+                     'topic': 'resultsdb.result.new',

+                     'msg': {

+                         'id': result['id'],

+                         'outcome': 'PASSED',

+                         'testcase': {

+                             'name': 'dist.rpmdeplint',

+                         },

+                         'data': {

+                             'item': [nvr],

+                             'type': ['koji_build'],

+                         }

+                     }

                  }

              }

-         }

-     }

-     hub = mock.MagicMock()

-     hub.config = {

-         'environment': 'environment',

-         'topic_prefix': 'topic_prefix',

-     }

-     handler = greenwave.consumers.resultsdb.ResultsDBHandler(hub)

+             hub = mock.MagicMock()

+             hub.config = {

+                 'environment': 'environment',

+                 'topic_prefix': 'topic_prefix',

+             }

+             handler = greenwave.consumers.resultsdb.ResultsDBHandler(hub)

  

-     handler.flask_app.config['policies'] = Policy.safe_load_all(policies)

-     with handler.flask_app.app_context():

-         handler.consume(message)

+             handler.flask_app.config['policies'] = Policy.safe_load_all(policies)

+             with handler.flask_app.app_context():

+                 handler.consume(message)

  

-     assert len(mock_fedmsg.mock_calls) == 0

+             assert len(mock_fedmsg.mock_calls) == 0

  

  

  def test_guess_product_version():
@@ -308,103 +325,111 @@ 

          assert product_version == 'rhel-8'

  

  

+ @pytest.mark.parametrize("config,publish", parameters)

  @mock.patch('greenwave.resources.ResultsRetriever.retrieve')

  @mock.patch('greenwave.resources.retrieve_decision')

  @mock.patch('greenwave.resources.retrieve_scm_from_koji')

  @mock.patch('greenwave.resources.retrieve_yaml_remote_rule')

- @mock.patch('greenwave.consumers.resultsdb.fedmsg.publish')

  def test_decision_change_for_modules(

-         mock_fedmsg,

          mock_retrieve_yaml_remote_rule,

          mock_retrieve_scm_from_koji,

          mock_retrieve_decision,

-         mock_retrieve_results):

+         mock_retrieve_results,

+         config,

+         publish):

      """

      Test publishing decision change message for a module.

      """

- 

-     # gating.yaml

-     gating_yaml = dedent("""

-         --- !Policy

-         product_versions:

-           - rhel-8

-         decision_context: osci_compose_gate_modules

-         subject_type: redhat-module

-         rules:

-           - !PassingTestCaseRule {test_case_name: baseos-ci.redhat-module.tier1.functional}

-     """)

-     mock_retrieve_yaml_remote_rule.return_value = gating_yaml

- 

-     policies = dedent("""

-     --- !Policy

-         id: "osci_compose_modules"

-         product_versions:

-           - rhel-8

-         decision_context: osci_compose_gate_modules

-         subject_type: redhat-module

-         blacklist: []

-         rules:

-           - !RemoteRule {}

-     """)

- 

-     nsvc = 'python36-3.6-820181204160430.17efdbc7'

-     result = {

-         'id': 1,

-         'testcase': {'name': 'baseos-ci.redhat-module.tier1.functional'},

-         'outcome': 'PASSED',

-         'data': {'item': nsvc, 'type': 'redhat-module'},

-     }

-     mock_retrieve_results.return_value = [result]

- 

-     def retrieve_decision(url, data):

-         #pylint: disable=unused-argument

-         if 'ignore_result' in data:

-             return None

-         return {}

-     mock_retrieve_decision.side_effect = retrieve_decision

-     mock_retrieve_scm_from_koji.return_value = ('modules', nsvc,

-                                                 '97273b80dd568bd15f9636b695f6001ecadb65e0')

- 

-     message = {

-         'body': {

-             'topic': 'resultsdb.result.new',

-             'msg': {

-                 'id': result['id'],

+     with mock.patch('greenwave.config.Config.MESSAGING', config):

+         with mock.patch(publish) as mock_fedmsg:

+ 

+             # gating.yaml

+             gating_yaml = dedent("""

+                 --- !Policy

+                 product_versions:

+                   - rhel-8

+                 decision_context: osci_compose_gate_modules

+                 subject_type: redhat-module

+                 rules:

+                   - !PassingTestCaseRule {test_case_name: baseos-ci.redhat-module.tier1.functional}

+             """)

+             mock_retrieve_yaml_remote_rule.return_value = gating_yaml

+ 

+             policies = dedent("""

+             --- !Policy

+                 id: "osci_compose_modules"

+                 product_versions:

+                   - rhel-8

+                 decision_context: osci_compose_gate_modules

+                 subject_type: redhat-module

+                 blacklist: []

+                 rules:

+                   - !RemoteRule {}

+             """)

+ 

+             nsvc = 'python36-3.6-820181204160430.17efdbc7'

+             result = {

+                 'id': 1,

+                 'testcase': {'name': 'baseos-ci.redhat-module.tier1.functional'},

                  'outcome': 'PASSED',

-                 'testcase': {

-                     'name': 'baseos-ci.redhat-module.tier1.functional',

-                 },

-                 'data': {

-                     'item': [nsvc],

-                     'type': ['redhat-module'],

+                 'data': {'item': nsvc, 'type': 'redhat-module'},

+             }

+             mock_retrieve_results.return_value = [result]

+ 

+             def retrieve_decision(url, data):

+                 #pylint: disable=unused-argument

+                 if 'ignore_result' in data:

+                     return None

+                 return {}

+             mock_retrieve_decision.side_effect = retrieve_decision

+             mock_retrieve_scm_from_koji.return_value = ('modules', nsvc,

+                                                         '97273b80dd568bd15f9636b695f6001ecadb65e0')

+ 

+             message = {

+                 'body': {

+                     'topic': 'resultsdb.result.new',

+                     'msg': {

+                         'id': result['id'],

+                         'outcome': 'PASSED',

+                         'testcase': {

+                             'name': 'baseos-ci.redhat-module.tier1.functional',

+                         },

+                         'data': {

+                             'item': [nsvc],

+                             'type': ['redhat-module'],

+                         }

+                     }

                  }

              }

-         }

-     }

-     hub = mock.MagicMock()

-     hub.config = {

-         'environment': 'environment',

-         'topic_prefix': 'topic_prefix',

-     }

-     handler = greenwave.consumers.resultsdb.ResultsDBHandler(hub)

- 

-     handler.flask_app.config['policies'] = Policy.safe_load_all(policies)

-     with handler.flask_app.app_context():

-         handler.consume(message)

- 

-     assert len(mock_fedmsg.mock_calls) == 1

- 

-     mock_call = mock_fedmsg.mock_calls[0][2]

-     assert mock_call['topic'] == 'decision.update'

- 

-     actual_msgs_sent = [mock_call['msg'] for call in mock_fedmsg.mock_calls]

-     assert actual_msgs_sent[0] == {

-         'decision_context': 'osci_compose_gate_modules',

-         'product_version': 'rhel-8',

-         'subject': [

-             {'item': nsvc, 'type': 'redhat-module'},

-         ],

-         'subject_type': 'redhat-module',

-         'subject_identifier': nsvc,

-         'previous': None,

-     }

+             hub = mock.MagicMock()

+             hub.config = {

+                 'environment': 'environment',

+                 'topic_prefix': 'topic_prefix',

+             }

+             handler = greenwave.consumers.resultsdb.ResultsDBHandler(hub)

+ 

+             handler.flask_app.config['policies'] = Policy.safe_load_all(policies)

+             with handler.flask_app.app_context():

+                 handler.consume(message)

+ 

+             assert len(mock_fedmsg.mock_calls) == 1

+ 

+             if config == "fedmsg":

+                 mock_call = mock_fedmsg.mock_calls[0][2]

+                 assert mock_call['topic'] == 'decision.update'

+                 actual_msgs_sent = mock_call['msg']

+             else:

+                 mock_call = mock_fedmsg.mock_calls[0][1][0]

+                 assert mock_call.topic == 'greenwave.decision.update'

+                 actual_msgs_sent = mock_call.body

+ 

+             assert actual_msgs_sent == {

+                 'decision_context': 'osci_compose_gate_modules',

+                 'product_version': 'rhel-8',

+                 'subject': [

+                     {'item': nsvc, 'type': 'redhat-module'},

+                 ],

+                 'subject_type': 'redhat-module',

+                 'subject_identifier': nsvc,

+                 'previous': None,

+             }

file modified
+1
@@ -3,4 +3,5 @@ 

  PyYAML

  dogpile.cache

  fedmsg[consumers]

+ fedora-messaging

  prometheus_client

Signed-off-by: Pierre-Yves Chibon pingou@pingoured.fr

rebased onto 5d5abd55b274931b52b407cbb0e325fc3166a04c

5 years ago

The commits can be reviewed separately, I can merge the commit porting to fedora-messaging with the one adding the tests if desired (I wanted to see if CI would kicked in on this PR and inform me whether just this change had broke something).

The second commit fixing a typo in the test could also likely be simplified, we're generating a list of messages sent but only checking the first one and we're already extracted the information above when checking the topic. So if desired I can drop the list from there. I can also move this change to a separate PR if desired.

3 new commits added

  • Add unit-test checks that fedora-messaging is properly called when configured
  • Fix typos in the tests
  • Start porting to fedora-messaging
5 years ago

rebased onto 4addb75222859343f845336b361cb670cab22efe

5 years ago

1 new commit added

  • Add a fedora-messaging consumer and an example configuration file
5 years ago

I've tested the consumer using the following script:

from greenwave.consumers.resultsdb import ResultsDBHandler
from greenwave.consumers.waiverdb import WaiverDBHandler

import fedora_messaging.api
from fedora_messaging.config import conf

class Dummy(object):
    """ Dummy object only storing a dictionary named "config" that can be passed
    onto the fedmsg consumer.
    """

    def __init__(self, config):
        self.config = config


config = {
    "topic_prefix": "org.fedoraproject",
    "environment":  "dev",
    "resultsdb_topic_suffix": "taskotron.result.new"
}
hub = Dummy(config)
handler = ResultsDBHandler(hub)
msg = {"body": {'msg': {
    "task": {
      "item": "git-secret-0.2.5-2.fc31", 
      "type": "koji_build", 
      "name": "dist.rpmgrill"
    }, 
    "result": {
      "prev_outcome": None, 
      "outcome": "PASSED", 
      "id": 27816938, 
      "submit_time": "2019-03-05 15:30:43 UTC", 
      "log_url": "https://taskotron.fedoraproject.org/artifacts/all/56eeecd4-3f5b-11e9-a73c-525400fc9f92/tests.yml/rpmgrill.json"
    }
  }}}
handler.consume(msg)

The code doesn't explode, it also doesn't complete because contexts_product_versions is [] so there is that :)

rebased onto a3c6040f87bd40492c02e2cef0b68a17ab36bf80

5 years ago

I guess you can move this one before the if, since you do the same in both branches (if and elif).

Mmm I think the bare except will fail in Flake8.

I'm not the best at English :D but shouldn't here be something like "Emitted a message on the message bus"? Or maybe "Emitted a message on the bus"?

Same here about the bare expect.

I'm afraid our pylint would fail here: W0703: Catching too general exception Exception (broad-except)

Doesn't "fedora_messaging" need to be added to the requirements?

Could you write a test also for waiverdb consumer?

Doesn't "fedora_messaging" need to be added to the requirements?

I've tried to make it optional, I'm fine with making it mandatory if you prefer :)

I thought I'd need it but apparently not, dropping it :)

Shall I make pylint ignore it or just remove the except?

You haven't heard, fedora-messaging support bodhi now :-p

/me goes to fix

rebased onto c4e2f2d7246757a7c7e2ebacf8b6b357d620dbc2

5 years ago

rebased onto 5a2098073acd135e246e483d79ab31e1f9a6cbd3

5 years ago

Should these print calls be replaced with logging method calls?

You're right, either log or plain remove, which do you prefer?

I would like to have the important things to be logged.
Maybe we can:
- log.debug() --> not so important stuff
- log.info() --> interesting logging (example: message published, or message not published because...)

rebased onto f2cf0c7a240ac52f6ebfb41ef3c8494034980b81

5 years ago

Since this is a repeated pattern, I wonder if we can use a mixin class to provide this dynamic publish property.

class DynamicPublisherMixin(object):
  def dynamic_pulish(self, topic, msg):
    ...

class ResultsDBHandler(fedmsg.consumers.FedmsgConsumer, DynamicPublisherMixin):
  def _publish_decision_changes(...):
    ...
    self.dynamic_pulish(topic, msg)

# And the same for WaiverDBHandler.

:+1: LGTM

My previous comment is simply a suggestion, not a blocker for this MR.

Nitpick: Could you use single quotes like the rest of the method?

Nitpick: Could you use single quotes like the rest of the method?

In my opinion, this validation should happen when the greenwave module is initialized, and upon failure, an exception should be raised. What do you think @gnaponie?

Why was this log message updated in this consumer but not the other?

Yeah, agreed, it would be preferable.

Why is this log message in this consumer but not the other?

Why is this log message in this consumer but not the other?

Same comment as in the other consumer

This seems very similar to the test above it. Could you use pytest parametrize instead? It'd make the maintenance of the tests easier.

Fixing the quotes here and above and in the two files

Do we have a mechanism in place for this? Do you want me to build one?
I guess we would basically like a mechanism checking the entire configuration file, so should we make this part of this PR or record this in a ticket and fix it in another PR?

@pingou How about you just remove the else and we can handle the configuration validation in a separate ticket/PR?

@pingou How about you just remove the else and we can handle the configuration validation in a separate ticket/PR?

I can most certainly do this :)

At this point I've all the changes lined up except for the pytest one, I'll look drop the else clause and fix the pytest one tomorrow :)

Thanks!

rebased onto 5118a544c54a7259a59b5a0de5adc3019b3efe65

5 years ago

1 new commit added

  • Drop the warning about unsupported messaging setting
5 years ago

It seems fine to me. @mprahl could you review the latest changes?

rebased onto fa9593d

5 years ago

Looks goods @pingou. Thanks for the PR!

Pull-Request has been merged by mprahl

5 years ago