From fa9593d36282ba2e39aefb15f12c1baa79821d2c Mon Sep 17 00:00:00 2001 From: Pierre-Yves Chibon Date: Mar 28 2019 11:27:54 +0000 Subject: [PATCH 1/5] Start porting to fedora-messaging Signed-off-by: Pierre-Yves Chibon --- diff --git a/greenwave/config.py b/greenwave/config.py index b0a0977..1ea8026 100644 --- a/greenwave/config.py +++ b/greenwave/config.py @@ -31,6 +31,8 @@ class Config(object): POLICIES_DIR = '/etc/greenwave/policies' + MESSAGING = 'fedmsg' + # By default, don't cache anything. CACHE = {'backend': 'dogpile.cache.null'} diff --git a/greenwave/consumers/resultsdb.py b/greenwave/consumers/resultsdb.py index e43795d..b83d1f9 100644 --- a/greenwave/consumers/resultsdb.py +++ b/greenwave/consumers/resultsdb.py @@ -24,6 +24,12 @@ from greenwave.policies import applicable_decision_context_product_version_pairs import xmlrpc.client +try: + import fedora_messaging.api + import fedora_messaging.exceptions +except ImportError: + pass + log = logging.getLogger(__name__) @@ -264,6 +270,8 @@ class ResultsDBHandler(fedmsg.consumers.FedmsgConsumer): testcase=testcase, product_version=product_version) + log.info('Getting greenwave info') + for decision_context, product_version in sorted(contexts_product_versions): greenwave_url = self.fedmsg_config['greenwave_api_url'] + '/decision' @@ -275,6 +283,7 @@ class ResultsDBHandler(fedmsg.consumers.FedmsgConsumer): } try: + log.debug('querying greenwave at: %s', greenwave_url) decision = greenwave.resources.retrieve_decision(greenwave_url, data) # get old decision @@ -282,6 +291,7 @@ class ResultsDBHandler(fedmsg.consumers.FedmsgConsumer): 'ignore_result': [result_id], }) old_decision = greenwave.resources.retrieve_decision(greenwave_url, data) + log.debug('old decision: %s', old_decision) except requests.exceptions.HTTPError as e: log.exception('Failed to retrieve decision for data=%s, error: %s', data, e) continue @@ -299,6 +309,30 @@ class ResultsDBHandler(fedmsg.consumers.FedmsgConsumer): 'product_version': product_version, 'previous': old_decision, }) - log.debug('Emitted a fedmsg, %r, on the "%s" topic', decision, - 'greenwave.decision.update') - fedmsg.publish(topic='decision.update', msg=decision) + log.info( + 'Emitted a message on the bus, %r, with the topic ' + '"greenwave.decision.update"', decision) + if self.flask_app.config['MESSAGING'] == 'fedmsg': + log.debug(' - to fedmsg') + fedmsg.publish(topic='decision.update', msg=decision) + elif self.flask_app.config['MESSAGING'] == 'fedora-messaging': + log.debug(' - to fedora-messaging') + try: + msg = fedora_messaging.api.Message( + topic='greenwave.decision.update', + body=decision + ) + fedora_messaging.api.publish(msg) + except fedora_messaging.exceptions.PublishReturned as e: + log.warning( + 'Fedora Messaging broker rejected message %s: %s', + msg.id, e) + except fedora_messaging.exceptions.ConnectionException as e: + log.warning('Error sending message %s: %s', msg.id, e) + except Exception: # pylint: disable=broad-except + log.exception('Error sending fedora-messaging message') + else: + log.warning( + 'Unsupported messaging option: %s', + self.flask_app.config['MESSAGING'] + ) diff --git a/greenwave/consumers/waiverdb.py b/greenwave/consumers/waiverdb.py index 8c77175..ae91ec9 100644 --- a/greenwave/consumers/waiverdb.py +++ b/greenwave/consumers/waiverdb.py @@ -20,6 +20,13 @@ from greenwave.api_v1 import subject_type_identifier_to_list from greenwave.monitoring import publish_decision_exceptions_waiver_counter from greenwave.policies import applicable_decision_context_product_version_pairs +try: + import fedora_messaging.api + import fedora_messaging.exceptions +except ImportError: + pass + + requests_session = requests.Session() @@ -135,6 +142,30 @@ class WaiverDBHandler(fedmsg.consumers.FedmsgConsumer): 'product_version': product_version, 'previous': old_decision, }) - log.debug('Emitted a fedmsg, %r, on the "%s" topic', msg, - 'greenwave.decision.update') - fedmsg.publish(topic='decision.update', msg=msg) + log.info( + 'Emitted a message on the bus, %r, with the topic ' + '"greenwave.decision.update"', decision) + if self.flask_app.config['MESSAGING'] == 'fedmsg': + log.debug(' - to fedmsg') + fedmsg.publish(topic='decision.update', msg=msg) + elif self.flask_app.config['MESSAGING'] == 'fedora-message': + log.debug(' - to fedora-messaging') + try: + msg = fedora_messaging.api.Message( + topic='greenwave.decision.update', + body=msg + ) + fedora_messaging.api.publish(msg) + except fedora_messaging.exceptions.PublishReturned as e: + log.warning( + 'Fedora Messaging broker rejected message %s: %s', + msg.id, e) + except fedora_messaging.exceptions.ConnectionException as e: + log.warning('Error sending message %s: %s', msg.id, e) + except Exception: # pylint: disable=broad-except + log.exception('Error sending fedora-messaging message') + else: + log.warning( + 'Unsupported messaging option: %s', + self.flask_app.config['MESSAGING'] + ) diff --git a/requirements.txt b/requirements.txt index 3387cae..5d985bb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ requests PyYAML dogpile.cache fedmsg[consumers] +fedora-messaging prometheus_client From e286251eee654962910eb7fa13ed9accd143dc7e Mon Sep 17 00:00:00 2001 From: Pierre-Yves Chibon Date: Mar 28 2019 11:27:54 +0000 Subject: [PATCH 2/5] Fix typos in the tests It seems pretty sure that we would want to use the variable created in the loop rather than relying on one that is defined outside of the loop. Both lead to the same results which is reassuring though. Signed-off-by: Pierre-Yves Chibon --- diff --git a/greenwave/tests/test_resultsdb_consumer.py b/greenwave/tests/test_resultsdb_consumer.py index f51458b..3b63e81 100644 --- a/greenwave/tests/test_resultsdb_consumer.py +++ b/greenwave/tests/test_resultsdb_consumer.py @@ -194,7 +194,7 @@ def test_remote_rule_decision_change( mock_call = mock_fedmsg.mock_calls[0][2] assert mock_call['topic'] == 'decision.update' - actual_msgs_sent = [mock_call['msg'] for call in mock_fedmsg.mock_calls] + actual_msgs_sent = [call[2]['msg'] for call in mock_fedmsg.mock_calls] assert actual_msgs_sent[0] == { 'decision_context': 'test_context', 'product_version': 'fedora-rawhide', @@ -397,7 +397,7 @@ def test_decision_change_for_modules( mock_call = mock_fedmsg.mock_calls[0][2] assert mock_call['topic'] == 'decision.update' - actual_msgs_sent = [mock_call['msg'] for call in mock_fedmsg.mock_calls] + actual_msgs_sent = [call[2]['msg'] for call in mock_fedmsg.mock_calls] assert actual_msgs_sent[0] == { 'decision_context': 'osci_compose_gate_modules', 'product_version': 'rhel-8', From 10c564bdaa97751d7debbf8e91d5fb5bb245fb36 Mon Sep 17 00:00:00 2001 From: Pierre-Yves Chibon Date: Mar 28 2019 11:27:54 +0000 Subject: [PATCH 3/5] Adjust unit-test to check that fedora-messaging is properly called when configured Signed-off-by: Pierre-Yves Chibon --- diff --git a/greenwave/tests/test_resultsdb_consumer.py b/greenwave/tests/test_resultsdb_consumer.py index 3b63e81..e87e929 100644 --- a/greenwave/tests/test_resultsdb_consumer.py +++ b/greenwave/tests/test_resultsdb_consumer.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: GPL-2.0+ import mock +import pytest from textwrap import dedent @@ -109,185 +110,201 @@ def test_no_announcement_subjects_for_old_compose_message(): assert subjects == [] +parameters = [ + ('fedmsg', 'greenwave.consumers.resultsdb.fedmsg.publish'), + ('fedora-messaging', 'greenwave.consumers.resultsdb.fedora_messaging.api.publish'), +] + +@pytest.mark.parametrize("config,publish", parameters) @mock.patch('greenwave.resources.ResultsRetriever.retrieve') @mock.patch('greenwave.resources.retrieve_decision') @mock.patch('greenwave.resources.retrieve_scm_from_koji') @mock.patch('greenwave.resources.retrieve_yaml_remote_rule') -@mock.patch('greenwave.consumers.resultsdb.fedmsg.publish') def test_remote_rule_decision_change( - mock_fedmsg, mock_retrieve_yaml_remote_rule, mock_retrieve_scm_from_koji, mock_retrieve_decision, - mock_retrieve_results): + mock_retrieve_results, + config, + publish): """ Test publishing decision change message for test cases mentioned in gating.yaml. """ - # gating.yaml - gating_yaml = dedent(""" - --- !Policy - product_versions: [fedora-rawhide, notexisting_prodversion] - decision_context: test_context - rules: - - !PassingTestCaseRule {test_case_name: dist.rpmdeplint} - """) - mock_retrieve_yaml_remote_rule.return_value = gating_yaml - - policies = dedent(""" - --- !Policy - id: test_policy - product_versions: [fedora-rawhide] - decision_context: test_context - subject_type: koji_build - rules: - - !RemoteRule {} - """) - - nvr = 'nethack-1.2.3-1.rawhide' - result = { - 'id': 1, - 'testcase': {'name': 'dist.rpmdeplint'}, - 'outcome': 'PASSED', - 'data': {'item': nvr, 'type': 'koji_build'}, - } - mock_retrieve_results.return_value = [result] - - def retrieve_decision(url, data): - #pylint: disable=unused-argument - if 'ignore_result' in data: - return None - return {} - mock_retrieve_decision.side_effect = retrieve_decision - mock_retrieve_scm_from_koji.return_value = ('rpms', nvr, - 'c3c47a08a66451cb9686c49f040776ed35a0d1bb') - - message = { - 'body': { - 'topic': 'resultsdb.result.new', - 'msg': { - 'id': result['id'], + with mock.patch('greenwave.config.Config.MESSAGING', config): + with mock.patch(publish) as mock_fedmsg: + # gating.yaml + gating_yaml = dedent(""" + --- !Policy + product_versions: [fedora-rawhide, notexisting_prodversion] + decision_context: test_context + rules: + - !PassingTestCaseRule {test_case_name: dist.rpmdeplint} + """) + mock_retrieve_yaml_remote_rule.return_value = gating_yaml + + policies = dedent(""" + --- !Policy + id: test_policy + product_versions: [fedora-rawhide] + decision_context: test_context + subject_type: koji_build + rules: + - !RemoteRule {} + """) + + nvr = 'nethack-1.2.3-1.rawhide' + result = { + 'id': 1, + 'testcase': {'name': 'dist.rpmdeplint'}, 'outcome': 'PASSED', - 'testcase': { - 'name': 'dist.rpmdeplint', - }, - 'data': { - 'item': [nvr], - 'type': ['koji_build'], + 'data': {'item': nvr, 'type': 'koji_build'}, + } + mock_retrieve_results.return_value = [result] + + def retrieve_decision(url, data): + #pylint: disable=unused-argument + if 'ignore_result' in data: + return None + return {} + mock_retrieve_decision.side_effect = retrieve_decision + mock_retrieve_scm_from_koji.return_value = ('rpms', nvr, + 'c3c47a08a66451cb9686c49f040776ed35a0d1bb') + + message = { + 'body': { + 'topic': 'resultsdb.result.new', + 'msg': { + 'id': result['id'], + 'outcome': 'PASSED', + 'testcase': { + 'name': 'dist.rpmdeplint', + }, + 'data': { + 'item': [nvr], + 'type': ['koji_build'], + } + } } } - } - } - hub = mock.MagicMock() - hub.config = { - 'environment': 'environment', - 'topic_prefix': 'topic_prefix', - } - handler = greenwave.consumers.resultsdb.ResultsDBHandler(hub) - - handler.flask_app.config['policies'] = Policy.safe_load_all(policies) - with handler.flask_app.app_context(): - handler.consume(message) - - assert len(mock_fedmsg.mock_calls) == 1 - - mock_call = mock_fedmsg.mock_calls[0][2] - assert mock_call['topic'] == 'decision.update' - - actual_msgs_sent = [call[2]['msg'] for call in mock_fedmsg.mock_calls] - assert actual_msgs_sent[0] == { - 'decision_context': 'test_context', - 'product_version': 'fedora-rawhide', - 'subject': [ - {'item': nvr, 'type': 'koji_build'}, - ], - 'subject_type': 'koji_build', - 'subject_identifier': nvr, - 'previous': None, - } + hub = mock.MagicMock() + hub.config = { + 'environment': 'environment', + 'topic_prefix': 'topic_prefix', + } + handler = greenwave.consumers.resultsdb.ResultsDBHandler(hub) + + handler.flask_app.config['policies'] = Policy.safe_load_all(policies) + with handler.flask_app.app_context(): + handler.consume(message) + + assert len(mock_fedmsg.mock_calls) == 1 + + if config == "fedmsg": + mock_call = mock_fedmsg.mock_calls[0][2] + assert mock_call['topic'] == 'decision.update' + actual_msgs_sent = mock_call['msg'] + else: + mock_call = mock_fedmsg.mock_calls[0][1][0] + assert mock_call.topic == 'greenwave.decision.update' + actual_msgs_sent = mock_call.body + + assert actual_msgs_sent == { + 'decision_context': 'test_context', + 'product_version': 'fedora-rawhide', + 'subject': [ + {'item': nvr, 'type': 'koji_build'}, + ], + 'subject_type': 'koji_build', + 'subject_identifier': nvr, + 'previous': None, + } +@pytest.mark.parametrize("config,publish", parameters) @mock.patch('greenwave.resources.ResultsRetriever.retrieve') @mock.patch('greenwave.resources.retrieve_decision') @mock.patch('greenwave.resources.retrieve_scm_from_koji') @mock.patch('greenwave.resources.retrieve_yaml_remote_rule') -@mock.patch('greenwave.consumers.resultsdb.fedmsg.publish') def test_remote_rule_decision_change_not_matching( - mock_fedmsg, mock_retrieve_yaml_remote_rule, mock_retrieve_scm_from_koji, mock_retrieve_decision, - mock_retrieve_results): + mock_retrieve_results, + config, + publish): """ Test publishing decision change message for test cases mentioned in gating.yaml. """ - # gating.yaml - gating_yaml = dedent(""" - --- !Policy - product_versions: [fedora-rawhide] - decision_context: test_context - rules: - - !PassingTestCaseRule {test_case_name: dist.rpmdeplint} - """) - mock_retrieve_yaml_remote_rule.return_value = gating_yaml - - policies = dedent(""" - --- !Policy - id: test_policy - product_versions: [fedora-rawhide] - decision_context: another_test_context - subject_type: koji_build - rules: - - !RemoteRule {} - """) - - nvr = 'nethack-1.2.3-1.rawhide' - result = { - 'id': 1, - 'testcase': {'name': 'dist.rpmdeplint'}, - 'outcome': 'PASSED', - 'data': {'item': nvr, 'type': 'koji_build'}, - } - mock_retrieve_results.return_value = [result] - - def retrieve_decision(url, data): - #pylint: disable=unused-argument - if 'ignore_result' in data: - return None - return {} - mock_retrieve_decision.side_effect = retrieve_decision - mock_retrieve_scm_from_koji.return_value = ('rpms', nvr, - 'c3c47a08a66451cb9686c49f040776ed35a0d1bb') - - message = { - 'body': { - 'topic': 'resultsdb.result.new', - 'msg': { - 'id': result['id'], + with mock.patch('greenwave.config.Config.MESSAGING', config): + with mock.patch(publish) as mock_fedmsg: + # gating.yaml + gating_yaml = dedent(""" + --- !Policy + product_versions: [fedora-rawhide] + decision_context: test_context + rules: + - !PassingTestCaseRule {test_case_name: dist.rpmdeplint} + """) + mock_retrieve_yaml_remote_rule.return_value = gating_yaml + + policies = dedent(""" + --- !Policy + id: test_policy + product_versions: [fedora-rawhide] + decision_context: another_test_context + subject_type: koji_build + rules: + - !RemoteRule {} + """) + + nvr = 'nethack-1.2.3-1.rawhide' + result = { + 'id': 1, + 'testcase': {'name': 'dist.rpmdeplint'}, 'outcome': 'PASSED', - 'testcase': { - 'name': 'dist.rpmdeplint', - }, - 'data': { - 'item': [nvr], - 'type': ['koji_build'], + 'data': {'item': nvr, 'type': 'koji_build'}, + } + mock_retrieve_results.return_value = [result] + + def retrieve_decision(url, data): + #pylint: disable=unused-argument + if 'ignore_result' in data: + return None + return {} + mock_retrieve_decision.side_effect = retrieve_decision + mock_retrieve_scm_from_koji.return_value = ('rpms', nvr, + 'c3c47a08a66451cb9686c49f040776ed35a0d1bb') + + message = { + 'body': { + 'topic': 'resultsdb.result.new', + 'msg': { + 'id': result['id'], + 'outcome': 'PASSED', + 'testcase': { + 'name': 'dist.rpmdeplint', + }, + 'data': { + 'item': [nvr], + 'type': ['koji_build'], + } + } } } - } - } - hub = mock.MagicMock() - hub.config = { - 'environment': 'environment', - 'topic_prefix': 'topic_prefix', - } - handler = greenwave.consumers.resultsdb.ResultsDBHandler(hub) + hub = mock.MagicMock() + hub.config = { + 'environment': 'environment', + 'topic_prefix': 'topic_prefix', + } + handler = greenwave.consumers.resultsdb.ResultsDBHandler(hub) - handler.flask_app.config['policies'] = Policy.safe_load_all(policies) - with handler.flask_app.app_context(): - handler.consume(message) + handler.flask_app.config['policies'] = Policy.safe_load_all(policies) + with handler.flask_app.app_context(): + handler.consume(message) - assert len(mock_fedmsg.mock_calls) == 0 + assert len(mock_fedmsg.mock_calls) == 0 def test_guess_product_version(): @@ -308,103 +325,111 @@ def test_guess_product_version(): assert product_version == 'rhel-8' +@pytest.mark.parametrize("config,publish", parameters) @mock.patch('greenwave.resources.ResultsRetriever.retrieve') @mock.patch('greenwave.resources.retrieve_decision') @mock.patch('greenwave.resources.retrieve_scm_from_koji') @mock.patch('greenwave.resources.retrieve_yaml_remote_rule') -@mock.patch('greenwave.consumers.resultsdb.fedmsg.publish') def test_decision_change_for_modules( - mock_fedmsg, mock_retrieve_yaml_remote_rule, mock_retrieve_scm_from_koji, mock_retrieve_decision, - mock_retrieve_results): + mock_retrieve_results, + config, + publish): """ Test publishing decision change message for a module. """ - - # gating.yaml - gating_yaml = dedent(""" - --- !Policy - product_versions: - - rhel-8 - decision_context: osci_compose_gate_modules - subject_type: redhat-module - rules: - - !PassingTestCaseRule {test_case_name: baseos-ci.redhat-module.tier1.functional} - """) - mock_retrieve_yaml_remote_rule.return_value = gating_yaml - - policies = dedent(""" - --- !Policy - id: "osci_compose_modules" - product_versions: - - rhel-8 - decision_context: osci_compose_gate_modules - subject_type: redhat-module - blacklist: [] - rules: - - !RemoteRule {} - """) - - nsvc = 'python36-3.6-820181204160430.17efdbc7' - result = { - 'id': 1, - 'testcase': {'name': 'baseos-ci.redhat-module.tier1.functional'}, - 'outcome': 'PASSED', - 'data': {'item': nsvc, 'type': 'redhat-module'}, - } - mock_retrieve_results.return_value = [result] - - def retrieve_decision(url, data): - #pylint: disable=unused-argument - if 'ignore_result' in data: - return None - return {} - mock_retrieve_decision.side_effect = retrieve_decision - mock_retrieve_scm_from_koji.return_value = ('modules', nsvc, - '97273b80dd568bd15f9636b695f6001ecadb65e0') - - message = { - 'body': { - 'topic': 'resultsdb.result.new', - 'msg': { - 'id': result['id'], + with mock.patch('greenwave.config.Config.MESSAGING', config): + with mock.patch(publish) as mock_fedmsg: + + # gating.yaml + gating_yaml = dedent(""" + --- !Policy + product_versions: + - rhel-8 + decision_context: osci_compose_gate_modules + subject_type: redhat-module + rules: + - !PassingTestCaseRule {test_case_name: baseos-ci.redhat-module.tier1.functional} + """) + mock_retrieve_yaml_remote_rule.return_value = gating_yaml + + policies = dedent(""" + --- !Policy + id: "osci_compose_modules" + product_versions: + - rhel-8 + decision_context: osci_compose_gate_modules + subject_type: redhat-module + blacklist: [] + rules: + - !RemoteRule {} + """) + + nsvc = 'python36-3.6-820181204160430.17efdbc7' + result = { + 'id': 1, + 'testcase': {'name': 'baseos-ci.redhat-module.tier1.functional'}, 'outcome': 'PASSED', - 'testcase': { - 'name': 'baseos-ci.redhat-module.tier1.functional', - }, - 'data': { - 'item': [nsvc], - 'type': ['redhat-module'], + 'data': {'item': nsvc, 'type': 'redhat-module'}, + } + mock_retrieve_results.return_value = [result] + + def retrieve_decision(url, data): + #pylint: disable=unused-argument + if 'ignore_result' in data: + return None + return {} + mock_retrieve_decision.side_effect = retrieve_decision + mock_retrieve_scm_from_koji.return_value = ('modules', nsvc, + '97273b80dd568bd15f9636b695f6001ecadb65e0') + + message = { + 'body': { + 'topic': 'resultsdb.result.new', + 'msg': { + 'id': result['id'], + 'outcome': 'PASSED', + 'testcase': { + 'name': 'baseos-ci.redhat-module.tier1.functional', + }, + 'data': { + 'item': [nsvc], + 'type': ['redhat-module'], + } + } } } - } - } - hub = mock.MagicMock() - hub.config = { - 'environment': 'environment', - 'topic_prefix': 'topic_prefix', - } - handler = greenwave.consumers.resultsdb.ResultsDBHandler(hub) - - handler.flask_app.config['policies'] = Policy.safe_load_all(policies) - with handler.flask_app.app_context(): - handler.consume(message) - - assert len(mock_fedmsg.mock_calls) == 1 - - mock_call = mock_fedmsg.mock_calls[0][2] - assert mock_call['topic'] == 'decision.update' - - actual_msgs_sent = [call[2]['msg'] for call in mock_fedmsg.mock_calls] - assert actual_msgs_sent[0] == { - 'decision_context': 'osci_compose_gate_modules', - 'product_version': 'rhel-8', - 'subject': [ - {'item': nsvc, 'type': 'redhat-module'}, - ], - 'subject_type': 'redhat-module', - 'subject_identifier': nsvc, - 'previous': None, - } + hub = mock.MagicMock() + hub.config = { + 'environment': 'environment', + 'topic_prefix': 'topic_prefix', + } + handler = greenwave.consumers.resultsdb.ResultsDBHandler(hub) + + handler.flask_app.config['policies'] = Policy.safe_load_all(policies) + with handler.flask_app.app_context(): + handler.consume(message) + + assert len(mock_fedmsg.mock_calls) == 1 + + if config == "fedmsg": + mock_call = mock_fedmsg.mock_calls[0][2] + assert mock_call['topic'] == 'decision.update' + actual_msgs_sent = mock_call['msg'] + else: + mock_call = mock_fedmsg.mock_calls[0][1][0] + assert mock_call.topic == 'greenwave.decision.update' + actual_msgs_sent = mock_call.body + + assert actual_msgs_sent == { + 'decision_context': 'osci_compose_gate_modules', + 'product_version': 'rhel-8', + 'subject': [ + {'item': nsvc, 'type': 'redhat-module'}, + ], + 'subject_type': 'redhat-module', + 'subject_identifier': nsvc, + 'previous': None, + } From 3846df92ed2a54e10a10a0367b2aed937b53bdf9 Mon Sep 17 00:00:00 2001 From: Pierre-Yves Chibon Date: Mar 28 2019 11:27:54 +0000 Subject: [PATCH 4/5] Add a fedora-messaging consumer and an example configuration file Signed-off-by: Pierre-Yves Chibon --- diff --git a/config.toml.example b/config.toml.example new file mode 100644 index 0000000..728cc9b --- /dev/null +++ b/config.toml.example @@ -0,0 +1,67 @@ +# A sample configuration for fedora-messaging. This file is in the TOML format. +# For complete details on all configuration options, see the documentation +# https://fedora-messaging.readthedocs.io/en/latest/configuration.html. + +amqp_url = "amqp://" + +publish_exchange = "amq.topic" + +callback = "greenwave.consumers.fedora_messaging_consumer:fedora_messaging_callback" + +# Note the double brackets below. +# To add another binding, add another [[bindings]] section. +[[bindings]] +queue = "greenwave" +exchange = "amq.topic" +routing_keys = [ + "org.fedoraproject.prod.taskotron.result.new", + "org.fedoraproject.stg.taskotron.result.new", + "org.fedoraproject.prod.waiver.new", + "org.fedoraproject.stg.waiver.new", +] + +[tls] +ca_cert = "/etc/pki/tls/certs/ca-bundle.crt" +keyfile = "/my/client/key.pem" +certfile = "/my/client/cert.pem" + +[client_properties] +app = "greenwave" + +[queues.greenwave] +durable = true +auto_delete = false +exclusive = false +arguments = {} + +[qos] +prefetch_size = 0 +prefetch_count = 25 + +[log_config] +version = 1 +disable_existing_loggers = true + +[log_config.formatters.simple] +format = "[%(name)s %(levelname)s] %(message)s" + +[log_config.handlers.console] +class = "logging.StreamHandler" +formatter = "simple" +stream = "ext://sys.stdout" + +[log_config.loggers.fedora_messaging] +level = "INFO" +propagate = false +handlers = ["console"] + +[log_config.root] +level = "WARNING" +handlers = ["console"] + +# greenwave consumer configuration +[consumer_config] +topic_prefix = 'org.fedoraproject' +environment = 'dev' +waiverdb_topic_suffix = 'waiver.new' +resultsdb_topic_suffix = 'taskotron.result.new' diff --git a/greenwave/consumers/fedora_messaging_consumer.py b/greenwave/consumers/fedora_messaging_consumer.py new file mode 100644 index 0000000..a4d994f --- /dev/null +++ b/greenwave/consumers/fedora_messaging_consumer.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# SPDX-License-Identifier: GPL-2.0+ +""" +The fedora-messaging consumer. + +This module is responsible consuming messages sent to the fedora message bus via +fedora-messaging. +It will get all the messages and pass them onto their appropriate fedmsg +consumers to re-use the same code path. +""" + +import logging + +from greenwave.consumers.resultsdb import ResultsDBHandler +from greenwave.consumers.waiverdb import WaiverDBHandler + +from fedora_messaging.config import conf + + +log = logging.getLogger(__name__) + + +class Dummy(object): + """ Dummy object only storing a dictionary named "config" that can be passed + onto the fedmsg consumer. + """ + + def __init__(self, config): + self.config = config + + +def fedora_messaging_callback(message): + """ + Callback called when messages from fedora-messaging are received. + It then passes them onto their appropriate fedmsg handler for code + portability. + + Args: + message (fedora_messaging.message.Message): The message we received + from the queue. + """ + log.info( + 'Received message from fedora-messaging with topic: %s', message.topic) + consumer_config = conf["consumer_config"] + if message.topic.endswith("taskotron.result.new"): + # New resultsdb results + config = { + "topic_prefix": consumer_config["topic_prefix"], + "environment": consumer_config["environment"], + "resultsdb_topic_suffix": consumer_config["resultsdb_topic_suffix"] + } + hub = Dummy(config) + handler = ResultsDBHandler(hub) + msg = {"body": {'msg': message.body}} + log.info('Sending message received to: ResultsDBHandler') + handler.consume(msg) + + elif message.topic.endswith('waiver.new'): + # New waiver submitted + config = { + "topic_prefix": consumer_config["topic_prefix"], + "environment": consumer_config["environment"], + "waiverdb_topic_suffix": consumer_config["waiverdb_topic_suffix"] + } + hub = Dummy(config) + handler = WaiverDBHandler(hub) + msg = {"body": {'msg': message.body}} + log.info('Sending message received to: WaiverDBHandler') + handler.consume(msg) From fb1018381db491d9119c41e30e5ba5dfe34c237b Mon Sep 17 00:00:00 2001 From: Pierre-Yves Chibon Date: Mar 28 2019 11:27:54 +0000 Subject: [PATCH 5/5] Drop the warning about unsupported messaging setting Signed-off-by: Pierre-Yves Chibon --- diff --git a/greenwave/consumers/resultsdb.py b/greenwave/consumers/resultsdb.py index b83d1f9..3bca365 100644 --- a/greenwave/consumers/resultsdb.py +++ b/greenwave/consumers/resultsdb.py @@ -331,8 +331,3 @@ class ResultsDBHandler(fedmsg.consumers.FedmsgConsumer): log.warning('Error sending message %s: %s', msg.id, e) except Exception: # pylint: disable=broad-except log.exception('Error sending fedora-messaging message') - else: - log.warning( - 'Unsupported messaging option: %s', - self.flask_app.config['MESSAGING'] - ) diff --git a/greenwave/consumers/waiverdb.py b/greenwave/consumers/waiverdb.py index ae91ec9..b36644a 100644 --- a/greenwave/consumers/waiverdb.py +++ b/greenwave/consumers/waiverdb.py @@ -164,8 +164,3 @@ class WaiverDBHandler(fedmsg.consumers.FedmsgConsumer): log.warning('Error sending message %s: %s', msg.id, e) except Exception: # pylint: disable=broad-except log.exception('Error sending fedora-messaging message') - else: - log.warning( - 'Unsupported messaging option: %s', - self.flask_app.config['MESSAGING'] - )