#461 Refactor consumers
Merged 4 years ago by yashn. Opened 4 years ago by lholecek.
lholecek/greenwave refactor-consumers  into  master

Refactor consumers
Lukas Holecek • 4 years ago  
@@ -19,7 +19,7 @@ 

          cache_config)

  

  

- @mock.patch('greenwave.consumers.resultsdb.fedmsg.publish')

+ @mock.patch('greenwave.consumers.consumer.fedmsg.publish')

  def test_consume_new_result(

          mock_fedmsg, requests_session, greenwave_server,

          testdatabuilder):
@@ -159,7 +159,7 @@ 

      }

  

  

- @mock.patch('greenwave.consumers.resultsdb.fedmsg.publish')

+ @mock.patch('greenwave.consumers.consumer.fedmsg.publish')

  def test_consume_unchanged_result(

          mock_fedmsg, requests_session, greenwave_server,

          testdatabuilder):
@@ -193,7 +193,7 @@ 

      assert len(mock_fedmsg.mock_calls) == 0

  

  

- @mock.patch('greenwave.consumers.resultsdb.fedmsg.publish')

+ @mock.patch('greenwave.consumers.consumer.fedmsg.publish')

  def test_consume_compose_id_result(

          mock_fedmsg, requests_session, greenwave_server,

          testdatabuilder):
@@ -262,7 +262,7 @@ 

      mock_fedmsg.assert_called_once_with(topic='decision.update', msg=msg)

  

  

- @mock.patch('greenwave.consumers.resultsdb.fedmsg.publish')

+ @mock.patch('greenwave.consumers.consumer.fedmsg.publish')

  def test_consume_legacy_result(

          mock_fedmsg, requests_session, greenwave_server,

          testdatabuilder):
@@ -394,7 +394,7 @@ 

      mock_fedmsg.assert_any_call(topic='decision.update', msg=second_msg)

  

  

- @mock.patch('greenwave.consumers.resultsdb.fedmsg.publish')

+ @mock.patch('greenwave.consumers.consumer.fedmsg.publish')

  def test_no_message_for_nonapplicable_policies(

          mock_fedmsg, requests_session, greenwave_server,

          testdatabuilder):
@@ -432,7 +432,7 @@ 

      mock_fedmsg.assert_not_called()

  

  

- @mock.patch('greenwave.consumers.resultsdb.fedmsg.publish')

+ @mock.patch('greenwave.consumers.consumer.fedmsg.publish')

  def test_consume_new_result_container_image(

          mock_fedmsg, requests_session, greenwave_server,

          testdatabuilder):

@@ -23,7 +23,7 @@ 

  

  

  @pytest.mark.parametrize('subject_type', ('koji_build', 'brew-build'))

- @mock.patch('greenwave.consumers.waiverdb.fedmsg.publish')

+ @mock.patch('greenwave.consumers.consumer.fedmsg.publish')

  def test_consume_new_waiver(

          mock_fedmsg, requests_session, greenwave_server, testdatabuilder,

          subject_type):

@@ -0,0 +1,222 @@ 

+ # SPDX-License-Identifier: GPL-2.0+

+ import fedmsg

+ import logging

+ import requests

+ 

+ import fedmsg.consumers

+ 

+ import greenwave.app_factory

+ from greenwave.api_v1 import subject_type_identifier_to_list

+ from greenwave.monitor import (

+     publish_decision_exceptions_result_counter,

+     messaging_tx_to_send_counter, messaging_tx_stopped_counter,

+     messaging_tx_sent_ok_counter, messaging_tx_failed_counter)

+ from greenwave.policies import applicable_decision_context_product_version_pairs

+ from greenwave.utils import right_before_this_time

+ 

+ import greenwave.resources

+ 

+ try:

+     import fedora_messaging.api

+     import fedora_messaging.exceptions

+ except ImportError:

+     pass

+ 

+ log = logging.getLogger(__name__)

+ 

+ 

+ def _equals_except_keys(lhs, rhs, except_keys):

+     keys = lhs.keys() - except_keys

+     return lhs.keys() == rhs.keys() \

+         and all(lhs[key] == rhs[key] for key in keys)

+ 

+ 

+ def _is_decision_unchanged(old_decision, decision):

+     """

+     Returns true only if new decision is same as old one

+     (ignores result_id values).

+     """

+     if old_decision is None or decision is None:

+         return old_decision == decision

+ 

+     requirements_keys = ('satisfied_requirements', 'unsatisfied_requirements')

+     if not _equals_except_keys(old_decision, decision, requirements_keys):

+         return False

+ 

+     ignore_keys = ('result_id',)

+     for key in requirements_keys:

+         old_requirements = old_decision[key]

+         requirements = decision[key]

+         if len(old_requirements) != len(requirements):

+             return False

+ 

+         for old_requirement, requirement in zip(old_requirements, requirements):

+             if not _equals_except_keys(old_requirement, requirement, ignore_keys):

+                 return False

+ 

+     return True

+ 

+ 

+ class Consumer(fedmsg.consumers.FedmsgConsumer):

+     """

+     Base class for consumers.

+     """

+     config_key = 'greenwave_handler'

+     hub_config_prefix = 'greenwave_consumer_'

+     default_topic = 'item.new'

+     monitor_labels = {'handler': 'greenwave_consumer'}

+ 

+     def __init__(self, hub, *args, **kwargs):

+         """

+         Initialize the consumer, subscribing it to the appropriate topics.

+ 

+         Args:

+             hub (moksha.hub.hub.CentralMokshaHub): The hub from which this handler is consuming

+                 messages. It is used to look up the hub config.

+         """

+ 

+         prefix = hub.config.get('topic_prefix')

+         env = hub.config.get('environment')

+         suffix = hub.config.get(f'{self.hub_config_prefix}topic_suffix', self.default_topic)

+         self.topic = ['.'.join([prefix, env, suffix])]

+         self.fedmsg_config = fedmsg.config.load_config()

+ 

+         config = kwargs.pop('config', None)

+ 

+         super().__init__(hub, *args, **kwargs)

+ 

+         self.flask_app = greenwave.app_factory.create_app(config)

+         self.greenwave_api_url = self.flask_app.config['GREENWAVE_API_URL']

+         log.info('Greenwave handler listening on: %s', self.topic)

+ 

+     def consume(self, message):

+         """

+         Process the given message and take action.

+ 

+         Args:

+             message (munch.Munch): A fedmsg about a new item.

+         """

+         message = message.get('body', message)

+         log.debug('Processing message "%s"', message)

+ 

+         with self.flask_app.app_context():

+             self._consume_message(message)

+ 

+     def _inc(self, messaging_counter):

+         """Helper method to increase monitoring counter."""

+         messaging_counter.labels(**self.monitor_labels).inc()

+ 

+     def _publish_decision_update_fedmsg(self, decision):

+         try:

+             fedmsg.publish(topic='decision.update', msg=decision)

+             self._inc(messaging_tx_sent_ok_counter)

+         except Exception:

+             log.exception('Error sending fedmsg message')

+             self._inc(messaging_tx_failed_counter)

+             raise

+ 

+     def _publish_decision_update_fedora_messaging(self, decision):

+         try:

+             msg = fedora_messaging.api.Message(

+                 topic='greenwave.decision.update',

+                 body=decision

+             )

+             fedora_messaging.api.publish(msg)

+             self._inc(messaging_tx_sent_ok_counter)

+         except fedora_messaging.exceptions.PublishReturned as e:

+             log.warning(

+                 'Fedora Messaging broker rejected message %s: %s',

+                 msg.id, e)

+             self._inc(messaging_tx_stopped_counter)

+         except fedora_messaging.exceptions.ConnectionException as e:

+             log.warning('Error sending message %s: %s', msg.id, e)

+             self._inc(messaging_tx_failed_counter)

+         except Exception:  # pylint: disable=broad-except

+             log.exception('Error sending fedora-messaging message')

+             self._inc(messaging_tx_failed_counter)

+ 

+     def _old_and_new_decisions(self, submit_time, **request_data):

+         """Returns decision before and after submit time."""

+         greenwave_url = self.greenwave_api_url + '/decision'

+         log.debug('querying greenwave at: %s', greenwave_url)

+ 

+         try:

+             decision = greenwave.resources.retrieve_decision(greenwave_url, request_data)

+ 

+             request_data['when'] = right_before_this_time(submit_time)

+             old_decision = greenwave.resources.retrieve_decision(greenwave_url, request_data)

+             log.debug('old decision: %s', old_decision)

+         except requests.exceptions.HTTPError as e:

+             log.exception('Failed to retrieve decision for data=%s, error: %s', request_data, e)

+             self._inc(messaging_tx_stopped_counter)

+             return None, None

+ 

+         return old_decision, decision

+ 

+     @publish_decision_exceptions_result_counter.count_exceptions()

+     def _publish_decision_change(

+             self,

+             submit_time,

+             subject_type,

+             subject_identifier,

+             testcase,

+             product_version,

+             publish_testcase):

+ 

+         policy_attributes = dict(

+             subject_type=subject_type,

+             subject_identifier=subject_identifier,

+             testcase=testcase,

+         )

+ 

+         if product_version:

+             policy_attributes['product_version'] = product_version

+ 

+         policies = self.flask_app.config['policies']

+         contexts_product_versions = applicable_decision_context_product_version_pairs(

+             policies, **policy_attributes)

+ 

+         log.info('Getting greenwave info')

+ 

+         for decision_context, product_version in sorted(contexts_product_versions):

+             self._inc(messaging_tx_to_send_counter)

+ 

+             old_decision, decision = self._old_and_new_decisions(

+                 submit_time,

+                 decision_context=decision_context,

+                 product_version=product_version,

+                 subject_type=subject_type,

+                 subject_identifier=subject_identifier,

+             )

+             if decision is None:

+                 continue

+ 

+             if _is_decision_unchanged(old_decision, decision):

+                 log.debug('Skipped emitting fedmsg, decision did not change: %s', decision)

+                 self._inc(messaging_tx_stopped_counter)

+                 continue

+ 

+             decision.update({

+                 'subject_type': subject_type,

+                 'subject_identifier': subject_identifier,

+                 # subject is for backwards compatibility only:

+                 'subject': subject_type_identifier_to_list(subject_type,

+                                                            subject_identifier),

+                 'decision_context': decision_context,

+                 'product_version': product_version,

+                 'previous': old_decision,

+             })

+             if publish_testcase:

+                 decision['testcase'] = testcase

+ 

+             log.info(

+                 'Emitting a message on the bus, %r, with the topic '

+                 '"greenwave.decision.update"', decision)

+             if self.flask_app.config['MESSAGING'] == 'fedmsg':

+                 log.debug('  - to fedmsg')

+                 self._publish_decision_update_fedmsg(decision)

+             elif self.flask_app.config['MESSAGING'] == 'fedora-messaging':

+                 log.debug('  - to fedora-messaging')

+                 self._publish_decision_update_fedora_messaging(decision)

+ 

+             self._inc(messaging_tx_stopped_counter)

file modified
+21 -192
@@ -12,28 +12,10 @@ 

  import logging

  import re

  

- import fedmsg.consumers

- import requests

- 

- import greenwave.app_factory

- import greenwave.resources

- from greenwave.api_v1 import subject_type_identifier_to_list

- from greenwave.monitor import (

-     publish_decision_exceptions_result_counter,

-     messaging_tx_to_send_counter, messaging_tx_stopped_counter,

-     messaging_tx_sent_ok_counter, messaging_tx_failed_counter)

- from greenwave.policies import applicable_decision_context_product_version_pairs

- from greenwave.utils import right_before_this_time

+ from greenwave.consumers.consumer import Consumer

  

  import xmlrpc.client

  

- try:

-     import fedora_messaging.api

-     import fedora_messaging.exceptions

- except ImportError:

-     pass

- 

- 

  log = logging.getLogger(__name__)

  

  
@@ -93,39 +75,7 @@ 

              pass

  

  

- def _equals_except_keys(lhs, rhs, except_keys):

-     keys = lhs.keys() - except_keys

-     return lhs.keys() == rhs.keys() \

-         and all(lhs[key] == rhs[key] for key in keys)

- 

- 

- def _is_decision_unchanged(old_decision, decision):

-     """

-     Returns true only if new decision is same as old one

-     (ignores result_id values).

-     """

-     if old_decision is None or decision is None:

-         return old_decision == decision

- 

-     requirements_keys = ('satisfied_requirements', 'unsatisfied_requirements')

-     if not _equals_except_keys(old_decision, decision, requirements_keys):

-         return False

- 

-     ignore_keys = ('result_id',)

-     for key in requirements_keys:

-         old_requirements = old_decision[key]

-         requirements = decision[key]

-         if len(old_requirements) != len(requirements):

-             return False

- 

-         for old_requirement, requirement in zip(old_requirements, requirements):

-             if not _equals_except_keys(old_requirement, requirement, ignore_keys):

-                 return False

- 

-     return True

- 

- 

- class ResultsDBHandler(fedmsg.consumers.FedmsgConsumer):

+ class ResultsDBHandler(Consumer):

      """

      Handle a new result.

  
@@ -134,29 +84,12 @@ 

      """

  

      config_key = 'resultsdb_handler'

+     hub_config_prefix = 'resultsdb_'

+     default_topic = 'taskotron.result.new'

+     monitor_labels = {'handler': 'resultsdb'}

  

-     def __init__(self, hub, *args, **kwargs):

-         """

-         Initialize the ResultsDBHandler, subscribing it to the appropriate topics.

- 

-         Args:

-             hub (moksha.hub.hub.CentralMokshaHub): The hub from which this handler is consuming

-                 messages. It is used to look up the hub config.

-         """

- 

-         prefix = hub.config.get('topic_prefix')

-         env = hub.config.get('environment')

-         suffix = hub.config.get('resultsdb_topic_suffix', 'taskotron.result.new')

-         self.topic = ['.'.join([prefix, env, suffix])]

-         self.fedmsg_config = fedmsg.config.load_config()

- 

-         config = kwargs.pop('config', None)

- 

-         super(ResultsDBHandler, self).__init__(hub, *args, **kwargs)

- 

-         self.flask_app = greenwave.app_factory.create_app(config)

-         self.greenwave_api_url = self.flask_app.config['GREENWAVE_API_URL']

-         self.cache = self.flask_app.cache

This is lost. Intentional?

This is lost. Intentional?

Yes, the cache has been removed for resultsdb some time ago and this variable was left unused.

+     def __init__(self, *args, **kwargs):

+         super().__init__(*args, **kwargs)

  

          koji_base_url = self.flask_app.config['KOJI_BASE_URL']

          if koji_base_url:
@@ -164,8 +97,6 @@ 

          else:

              self.koji_proxy = None

  

-         log.info('Greenwave resultsdb handler listening on: %s', self.topic)

- 

      @staticmethod

      def announcement_subjects(message):

          """
@@ -213,17 +144,9 @@ 

          elif 'item' in data and _type:

              yield (_type, _decode(data['item']))

  

-     def consume(self, message):

-         """

-         Process the given message and take action.

- 

-         Args:

-             message (munch.Munch): A fedmsg about a new result.

-         """

-         message = message.get('body', message)

-         log.debug('Processing message "%s"', message)

- 

+     def _consume_message(self, message):

          msg = message['msg']

+ 

          try:

              testcase = msg['testcase']['name']

          except KeyError:
@@ -234,111 +157,17 @@ 

          except KeyError:

              submit_time = msg['result']['submit_time']

  

-         with self.flask_app.app_context():

-             for subject_type, subject_identifier in self.announcement_subjects(message):

-                 log.debug('Considering subject %s: %r', subject_type, subject_identifier)

-                 self._publish_decision_changes(subject_type, subject_identifier,

-                                                submit_time, testcase)

- 

-     @publish_decision_exceptions_result_counter.count_exceptions()

-     def _publish_decision_changes(self, subject_type, subject_identifier, submit_time, testcase):

-         """

-         Process the given subject and publish a message if the decision is changed.

+         for subject_type, subject_identifier in self.announcement_subjects(message):

+             log.debug('Considering subject %s: %r', subject_type, subject_identifier)

  

-         Args:

-             subject_type (munch.Munch): subject type argument, used to query greenwave.

-             subject_identifier (munch.Munch): subject identifier argument, used to query greenwave.

-             submit_time (string): date. After this date, results will be ignored for comparison.

-             testcase (munch.Munch): the name of a testcase to consider.

-         """

-         policy_attributes = dict(

-             subject_type=subject_type,

-             subject_identifier=subject_identifier,

-             testcase=testcase,

-         )

- 

-         product_version = _subject_product_version(

-             subject_identifier, subject_type, self.koji_proxy)

-         if product_version:

-             policy_attributes['product_version'] = product_version

- 

-         policies = self.flask_app.config['policies']

-         contexts_product_versions = applicable_decision_context_product_version_pairs(

-             policies, **policy_attributes)

- 

-         log.info('Getting greenwave info')

- 

-         for decision_context, product_version in sorted(contexts_product_versions):

-             messaging_tx_to_send_counter.labels(handler='resultsdb').inc()

-             greenwave_url = self.greenwave_api_url + '/decision'

- 

-             data = {

-                 'decision_context': decision_context,

-                 'product_version': product_version,

-                 'subject_type': subject_type,

-                 'subject_identifier': subject_identifier,

-             }

+             product_version = _subject_product_version(

+                 subject_identifier, subject_type, self.koji_proxy)

  

-             try:

-                 log.debug('querying greenwave at: %s', greenwave_url)

-                 decision = greenwave.resources.retrieve_decision(greenwave_url, data)

- 

-                 # get old decision

-                 data.update({

-                     'when': right_before_this_time(submit_time),

-                 })

-                 old_decision = greenwave.resources.retrieve_decision(greenwave_url, data)

-                 log.debug('old decision: %s', old_decision)

-             except requests.exceptions.HTTPError as e:

-                 log.exception('Failed to retrieve decision for data=%s, error: %s', data, e)

-                 messaging_tx_stopped_counter.labels(handler='resultsdb').inc()

-                 continue

- 

-             if _is_decision_unchanged(old_decision, decision):

-                 log.debug('Skipped emitting fedmsg, decision did not change: %s', decision)

-                 messaging_tx_stopped_counter.labels(handler='resultsdb').inc()

-                 continue

- 

-             decision.update({

-                 'subject_type': subject_type,

-                 'subject_identifier': subject_identifier,

-                 # subject is for backwards compatibility only:

-                 'subject': subject_type_identifier_to_list(subject_type,

-                                                            subject_identifier),

-                 'decision_context': decision_context,

-                 'product_version': product_version,

-                 'previous': old_decision,

-             })

-             log.info(

-                 'Emitting a message on the bus, %r, with the topic '

-                 '"greenwave.decision.update"', decision)

-             if self.flask_app.config['MESSAGING'] == 'fedmsg':

-                 log.debug('  - to fedmsg')

-                 try:

-                     fedmsg.publish(topic='decision.update', msg=decision)

-                     messaging_tx_sent_ok_counter.labels(handler='resultsdb').inc()

-                 except Exception:

-                     messaging_tx_failed_counter.labels(handler='resultsdb').inc()

-                     raise

-             elif self.flask_app.config['MESSAGING'] == 'fedora-messaging':

-                 log.debug('  - to fedora-messaging')

-                 try:

-                     msg = fedora_messaging.api.Message(

-                         topic='greenwave.decision.update',

-                         body=decision

-                     )

-                     fedora_messaging.api.publish(msg)

-                     messaging_tx_sent_ok_counter.labels(handler='resultsdb').inc()

-                 except fedora_messaging.exceptions.PublishReturned as e:

-                     log.warning(

-                         'Fedora Messaging broker rejected message %s: %s',

-                         msg.id, e)

-                     messaging_tx_stopped_counter.labels(handler='resultsdb').inc()

-                 except fedora_messaging.exceptions.ConnectionException as e:

-                     log.warning('Error sending message %s: %s', msg.id, e)

-                     messaging_tx_failed_counter.labels(handler='resultsdb').inc()

-                 except Exception:  # pylint: disable=broad-except

-                     log.exception('Error sending fedora-messaging message')

-                     messaging_tx_failed_counter.labels(handler='resultsdb').inc()

- 

-             messaging_tx_stopped_counter.labels(handler='resultsdb').inc()

+             self._publish_decision_change(

+                 submit_time=submit_time,

+                 subject_type=subject_type,

+                 subject_identifier=subject_identifier,

+                 testcase=testcase,

+                 product_version=product_version,

+                 publish_testcase=False,

+             )

file modified
+11 -157
@@ -9,35 +9,10 @@ 

  to the message bus about the newly satisfied/unsatisfied policy.

  """

  

- import logging

- import json

+ from greenwave.consumers.consumer import Consumer

  

- import fedmsg.consumers

  

- import greenwave.app_factory

- from greenwave.api_v1 import subject_type_identifier_to_list

- from greenwave.monitor import (

-     publish_decision_exceptions_waiver_counter,

-     messaging_tx_to_send_counter, messaging_tx_stopped_counter,

-     messaging_tx_sent_ok_counter, messaging_tx_failed_counter)

- from greenwave.policies import applicable_decision_context_product_version_pairs

- from greenwave.utils import right_before_this_time

- from greenwave.request_session import get_requests_session

- 

- try:

-     import fedora_messaging.api

-     import fedora_messaging.exceptions

- except ImportError:

-     pass

- 

- 

- requests_session = get_requests_session()

- 

- 

- log = logging.getLogger(__name__)

- 

- 

- class WaiverDBHandler(fedmsg.consumers.FedmsgConsumer):

+ class WaiverDBHandler(Consumer):

      """

      Handle a new waiver.

  
@@ -46,39 +21,11 @@ 

      """

  

      config_key = 'waiverdb_handler'

+     hub_config_prefix = 'waiverdb_'

+     default_topic = 'waiver.new'

+     monitor_labels = {'handler': 'waiverdb'}

  

-     def __init__(self, hub, *args, **kwargs):

-         """

-         Initialize the WaiverDBHandler, subscribing it to the appropriate topics.

- 

-         Args:

-             hub (moksha.hub.hub.CentralMokshaHub): The hub from which this handler is consuming

-                 messages. It is used to look up the hub config.

-         """

- 

-         prefix = hub.config.get('topic_prefix')

-         env = hub.config.get('environment')

-         suffix = hub.config.get('waiverdb_topic_suffix', 'waiver.new')

-         self.topic = ['.'.join([prefix, env, suffix])]

-         self.fedmsg_config = fedmsg.config.load_config()

- 

-         config = kwargs.pop('config', None)

- 

-         super(WaiverDBHandler, self).__init__(hub, *args, **kwargs)

- 

-         self.flask_app = greenwave.app_factory.create_app(config)

-         self.greenwave_api_url = self.flask_app.config['GREENWAVE_API_URL']

-         log.info('Greenwave waiverdb handler listening on: %s', self.topic)

- 

-     def consume(self, message):

-         """

-         Process the given message and publish a message if the decision is changed.

- 

-         Args:

-             message (munch.Munch): A fedmsg about a new waiver.

-         """

-         message = message.get('body', message)

-         log.debug('Processing message "%s"', message)

+     def _consume_message(self, message):

          msg = message['msg']

  

          product_version = msg['product_version']
@@ -87,104 +34,11 @@ 

          subject_identifier = msg['subject_identifier']

          submit_time = msg['timestamp']

  

-         with self.flask_app.app_context():

-             self._publish_decision_changes(subject_type, subject_identifier, submit_time,

-                                            product_version, testcase)

- 

-     @publish_decision_exceptions_waiver_counter.count_exceptions()

-     def _publish_decision_changes(self, subject_type, subject_identifier, submit_time,

-                                   product_version, testcase):

-         policies = self.flask_app.config['policies']

-         contexts_product_versions = applicable_decision_context_product_version_pairs(

-             policies,

+         self._publish_decision_change(

+             submit_time=submit_time,

              subject_type=subject_type,

              subject_identifier=subject_identifier,

              testcase=testcase,

-             product_version=product_version)

- 

-         for decision_context, product_version in sorted(contexts_product_versions):

-             messaging_tx_to_send_counter.labels(handler='waiverdb').inc()

-             data = {

-                 'decision_context': decision_context,

-                 'product_version': product_version,

-                 'subject_type': subject_type,

-                 'subject_identifier': subject_identifier,

-             }

-             response = requests_session.post(

-                 self.greenwave_api_url + '/decision',

-                 headers={'Content-Type': 'application/json'},

-                 data=json.dumps(data))

- 

-             if not response.ok:

-                 log.error(response.text)

-                 messaging_tx_stopped_counter.labels(handler='waiverdb').inc()

-                 continue

- 

-             decision = response.json()

- 

-             # get old decision

-             data.update({

-                 'when': right_before_this_time(submit_time),

-             })

-             response = requests_session.post(

-                 self.greenwave_api_url + '/decision',

-                 headers={'Content-Type': 'application/json'},

-                 data=json.dumps(data))

- 

-             if not response.ok:

-                 log.error(response.text)

-                 messaging_tx_stopped_counter.labels(handler='waiverdb').inc()

-                 continue

- 

-             old_decision = response.json()

- 

-             if decision == old_decision:

-                 log.debug('Skipped emitting fedmsg, decision did not change: %s', decision)

-                 messaging_tx_stopped_counter.labels(handler='waiverdb').inc()

-                 continue

- 

-             msg = decision

-             decision.update({

-                 'subject_type': subject_type,

-                 'subject_identifier': subject_identifier,

-                 # subject is for backwards compatibility only:

-                 'subject': subject_type_identifier_to_list(subject_type,

-                                                            subject_identifier),

-                 'testcase': testcase,

-                 'decision_context': decision_context,

-                 'product_version': product_version,

-                 'previous': old_decision,

-             })

-             log.info(

-                 'Emitting a message on the bus, %r, with the topic '

-                 '"greenwave.decision.update"', decision)

-             if self.flask_app.config['MESSAGING'] == 'fedmsg':

-                 log.debug('  - to fedmsg')

-                 try:

-                     fedmsg.publish(topic='decision.update', msg=msg)

-                     messaging_tx_sent_ok_counter.labels(handler='waiverdb').inc()

-                 except Exception:

-                     messaging_tx_failed_counter.labels(handler='waiverdb').inc()

-                     raise

-             elif self.flask_app.config['MESSAGING'] == 'fedora-messaging':

-                 log.debug('  - to fedora-messaging')

-                 try:

-                     msg = fedora_messaging.api.Message(

-                         topic='greenwave.decision.update',

-                         body=msg

-                     )

-                     fedora_messaging.api.publish(msg)

-                     messaging_tx_sent_ok_counter.labels(handler='waiverdb').inc()

-                 except fedora_messaging.exceptions.PublishReturned as e:

-                     log.warning(

-                         'Fedora Messaging broker rejected message %s: %s',

-                         msg.id, e)

-                     messaging_tx_stopped_counter.labels(handler='waiverdb').inc()

-                 except fedora_messaging.exceptions.ConnectionException as e:

-                     log.warning('Error sending message %s: %s', msg.id, e)

-                     messaging_tx_failed_counter.labels(handler='waiverdb').inc()

-                 except Exception:  # pylint: disable=broad-except

-                     log.exception('Error sending fedora-messaging message')

-                     messaging_tx_failed_counter.labels(handler='waiverdb').inc()

- 

-             messaging_tx_stopped_counter.labels(handler='waiverdb').inc()

+             product_version=product_version,

+             publish_testcase=True,

+         )

@@ -111,8 +111,8 @@ 

  

  

  parameters = [

-     ('fedmsg', 'greenwave.consumers.resultsdb.fedmsg.publish'),

-     ('fedora-messaging', 'greenwave.consumers.resultsdb.fedora_messaging.api.publish'),

+     ('fedmsg', 'greenwave.consumers.consumer.fedmsg.publish'),

+     ('fedora-messaging', 'greenwave.consumers.consumer.fedora_messaging.api.publish'),

  ]

  

  
@@ -546,7 +546,7 @@ 

      """)

  

      config = 'fedora-messaging'

-     publish = 'greenwave.consumers.resultsdb.fedora_messaging.api.publish'

+     publish = 'greenwave.consumers.consumer.fedora_messaging.api.publish'

  

      with mock.patch('greenwave.config.Config.MESSAGING', config):

          with mock.patch(publish) as mock_fedmsg:

Move common consumer code to Consumer base class.

Signed-off-by: Lukas Holecek hluk@email.cz

rebased onto 0f787c96af3283f752e13f3f3a4f1ac967523375

4 years ago

rebased onto 590f456

4 years ago

Other than my previous comment, this seems reasonable.

This is lost. Intentional?

Yes, the cache has been removed for resultsdb some time ago and this variable was left unused.

Commit b3942ce fixes this pull-request

Pull-Request has been merged by yashn

4 years ago

Pull-Request has been merged by yashn

4 years ago

Sorry I didn't review this... I was too slow. It was in my todo list :D

Sorry I didn't review this... I was too slow. It was in my todo list :D

Sorry I was unaware that this was waiting for another review. I merged it because it had a +1 from Luiz.

Eheh @yashn, no prob! It's my bad I was slow, not your bad that you merged it :D
Lui's +1 is enough. I was just planning to have a look before merging it myself :)