| |
@@ -12,28 +12,10 @@
|
| |
import logging
|
| |
import re
|
| |
|
| |
- import fedmsg.consumers
|
| |
- import requests
|
| |
-
|
| |
- import greenwave.app_factory
|
| |
- import greenwave.resources
|
| |
- from greenwave.api_v1 import subject_type_identifier_to_list
|
| |
- from greenwave.monitor import (
|
| |
- publish_decision_exceptions_result_counter,
|
| |
- messaging_tx_to_send_counter, messaging_tx_stopped_counter,
|
| |
- messaging_tx_sent_ok_counter, messaging_tx_failed_counter)
|
| |
- from greenwave.policies import applicable_decision_context_product_version_pairs
|
| |
- from greenwave.utils import right_before_this_time
|
| |
+ from greenwave.consumers.consumer import Consumer
|
| |
|
| |
import xmlrpc.client
|
| |
|
| |
- try:
|
| |
- import fedora_messaging.api
|
| |
- import fedora_messaging.exceptions
|
| |
- except ImportError:
|
| |
- pass
|
| |
-
|
| |
-
|
| |
log = logging.getLogger(__name__)
|
| |
|
| |
|
| |
@@ -93,39 +75,7 @@
|
| |
pass
|
| |
|
| |
|
| |
- def _equals_except_keys(lhs, rhs, except_keys):
|
| |
- keys = lhs.keys() - except_keys
|
| |
- return lhs.keys() == rhs.keys() \
|
| |
- and all(lhs[key] == rhs[key] for key in keys)
|
| |
-
|
| |
-
|
| |
- def _is_decision_unchanged(old_decision, decision):
|
| |
- """
|
| |
- Returns true only if new decision is same as old one
|
| |
- (ignores result_id values).
|
| |
- """
|
| |
- if old_decision is None or decision is None:
|
| |
- return old_decision == decision
|
| |
-
|
| |
- requirements_keys = ('satisfied_requirements', 'unsatisfied_requirements')
|
| |
- if not _equals_except_keys(old_decision, decision, requirements_keys):
|
| |
- return False
|
| |
-
|
| |
- ignore_keys = ('result_id',)
|
| |
- for key in requirements_keys:
|
| |
- old_requirements = old_decision[key]
|
| |
- requirements = decision[key]
|
| |
- if len(old_requirements) != len(requirements):
|
| |
- return False
|
| |
-
|
| |
- for old_requirement, requirement in zip(old_requirements, requirements):
|
| |
- if not _equals_except_keys(old_requirement, requirement, ignore_keys):
|
| |
- return False
|
| |
-
|
| |
- return True
|
| |
-
|
| |
-
|
| |
- class ResultsDBHandler(fedmsg.consumers.FedmsgConsumer):
|
| |
+ class ResultsDBHandler(Consumer):
|
| |
"""
|
| |
Handle a new result.
|
| |
|
| |
@@ -134,29 +84,12 @@
|
| |
"""
|
| |
|
| |
config_key = 'resultsdb_handler'
|
| |
+ hub_config_prefix = 'resultsdb_'
|
| |
+ default_topic = 'taskotron.result.new'
|
| |
+ monitor_labels = {'handler': 'resultsdb'}
|
| |
|
| |
- def __init__(self, hub, *args, **kwargs):
|
| |
- """
|
| |
- Initialize the ResultsDBHandler, subscribing it to the appropriate topics.
|
| |
-
|
| |
- Args:
|
| |
- hub (moksha.hub.hub.CentralMokshaHub): The hub from which this handler is consuming
|
| |
- messages. It is used to look up the hub config.
|
| |
- """
|
| |
-
|
| |
- prefix = hub.config.get('topic_prefix')
|
| |
- env = hub.config.get('environment')
|
| |
- suffix = hub.config.get('resultsdb_topic_suffix', 'taskotron.result.new')
|
| |
- self.topic = ['.'.join([prefix, env, suffix])]
|
| |
- self.fedmsg_config = fedmsg.config.load_config()
|
| |
-
|
| |
- config = kwargs.pop('config', None)
|
| |
-
|
| |
- super(ResultsDBHandler, self).__init__(hub, *args, **kwargs)
|
| |
-
|
| |
- self.flask_app = greenwave.app_factory.create_app(config)
|
| |
- self.greenwave_api_url = self.flask_app.config['GREENWAVE_API_URL']
|
| |
- self.cache = self.flask_app.cache
|
| |
+ def __init__(self, *args, **kwargs):
|
| |
+ super().__init__(*args, **kwargs)
|
| |
|
| |
koji_base_url = self.flask_app.config['KOJI_BASE_URL']
|
| |
if koji_base_url:
|
| |
@@ -164,8 +97,6 @@
|
| |
else:
|
| |
self.koji_proxy = None
|
| |
|
| |
- log.info('Greenwave resultsdb handler listening on: %s', self.topic)
|
| |
-
|
| |
@staticmethod
|
| |
def announcement_subjects(message):
|
| |
"""
|
| |
@@ -213,17 +144,9 @@
|
| |
elif 'item' in data and _type:
|
| |
yield (_type, _decode(data['item']))
|
| |
|
| |
- def consume(self, message):
|
| |
- """
|
| |
- Process the given message and take action.
|
| |
-
|
| |
- Args:
|
| |
- message (munch.Munch): A fedmsg about a new result.
|
| |
- """
|
| |
- message = message.get('body', message)
|
| |
- log.debug('Processing message "%s"', message)
|
| |
-
|
| |
+ def _consume_message(self, message):
|
| |
msg = message['msg']
|
| |
+
|
| |
try:
|
| |
testcase = msg['testcase']['name']
|
| |
except KeyError:
|
| |
@@ -234,111 +157,17 @@
|
| |
except KeyError:
|
| |
submit_time = msg['result']['submit_time']
|
| |
|
| |
- with self.flask_app.app_context():
|
| |
- for subject_type, subject_identifier in self.announcement_subjects(message):
|
| |
- log.debug('Considering subject %s: %r', subject_type, subject_identifier)
|
| |
- self._publish_decision_changes(subject_type, subject_identifier,
|
| |
- submit_time, testcase)
|
| |
-
|
| |
- @publish_decision_exceptions_result_counter.count_exceptions()
|
| |
- def _publish_decision_changes(self, subject_type, subject_identifier, submit_time, testcase):
|
| |
- """
|
| |
- Process the given subject and publish a message if the decision is changed.
|
| |
+ for subject_type, subject_identifier in self.announcement_subjects(message):
|
| |
+ log.debug('Considering subject %s: %r', subject_type, subject_identifier)
|
| |
|
| |
- Args:
|
| |
- subject_type (munch.Munch): subject type argument, used to query greenwave.
|
| |
- subject_identifier (munch.Munch): subject identifier argument, used to query greenwave.
|
| |
- submit_time (string): date. After this date, results will be ignored for comparison.
|
| |
- testcase (munch.Munch): the name of a testcase to consider.
|
| |
- """
|
| |
- policy_attributes = dict(
|
| |
- subject_type=subject_type,
|
| |
- subject_identifier=subject_identifier,
|
| |
- testcase=testcase,
|
| |
- )
|
| |
-
|
| |
- product_version = _subject_product_version(
|
| |
- subject_identifier, subject_type, self.koji_proxy)
|
| |
- if product_version:
|
| |
- policy_attributes['product_version'] = product_version
|
| |
-
|
| |
- policies = self.flask_app.config['policies']
|
| |
- contexts_product_versions = applicable_decision_context_product_version_pairs(
|
| |
- policies, **policy_attributes)
|
| |
-
|
| |
- log.info('Getting greenwave info')
|
| |
-
|
| |
- for decision_context, product_version in sorted(contexts_product_versions):
|
| |
- messaging_tx_to_send_counter.labels(handler='resultsdb').inc()
|
| |
- greenwave_url = self.greenwave_api_url + '/decision'
|
| |
-
|
| |
- data = {
|
| |
- 'decision_context': decision_context,
|
| |
- 'product_version': product_version,
|
| |
- 'subject_type': subject_type,
|
| |
- 'subject_identifier': subject_identifier,
|
| |
- }
|
| |
+ product_version = _subject_product_version(
|
| |
+ subject_identifier, subject_type, self.koji_proxy)
|
| |
|
| |
- try:
|
| |
- log.debug('querying greenwave at: %s', greenwave_url)
|
| |
- decision = greenwave.resources.retrieve_decision(greenwave_url, data)
|
| |
-
|
| |
- # get old decision
|
| |
- data.update({
|
| |
- 'when': right_before_this_time(submit_time),
|
| |
- })
|
| |
- old_decision = greenwave.resources.retrieve_decision(greenwave_url, data)
|
| |
- log.debug('old decision: %s', old_decision)
|
| |
- except requests.exceptions.HTTPError as e:
|
| |
- log.exception('Failed to retrieve decision for data=%s, error: %s', data, e)
|
| |
- messaging_tx_stopped_counter.labels(handler='resultsdb').inc()
|
| |
- continue
|
| |
-
|
| |
- if _is_decision_unchanged(old_decision, decision):
|
| |
- log.debug('Skipped emitting fedmsg, decision did not change: %s', decision)
|
| |
- messaging_tx_stopped_counter.labels(handler='resultsdb').inc()
|
| |
- continue
|
| |
-
|
| |
- decision.update({
|
| |
- 'subject_type': subject_type,
|
| |
- 'subject_identifier': subject_identifier,
|
| |
- # subject is for backwards compatibility only:
|
| |
- 'subject': subject_type_identifier_to_list(subject_type,
|
| |
- subject_identifier),
|
| |
- 'decision_context': decision_context,
|
| |
- 'product_version': product_version,
|
| |
- 'previous': old_decision,
|
| |
- })
|
| |
- log.info(
|
| |
- 'Emitting a message on the bus, %r, with the topic '
|
| |
- '"greenwave.decision.update"', decision)
|
| |
- if self.flask_app.config['MESSAGING'] == 'fedmsg':
|
| |
- log.debug(' - to fedmsg')
|
| |
- try:
|
| |
- fedmsg.publish(topic='decision.update', msg=decision)
|
| |
- messaging_tx_sent_ok_counter.labels(handler='resultsdb').inc()
|
| |
- except Exception:
|
| |
- messaging_tx_failed_counter.labels(handler='resultsdb').inc()
|
| |
- raise
|
| |
- elif self.flask_app.config['MESSAGING'] == 'fedora-messaging':
|
| |
- log.debug(' - to fedora-messaging')
|
| |
- try:
|
| |
- msg = fedora_messaging.api.Message(
|
| |
- topic='greenwave.decision.update',
|
| |
- body=decision
|
| |
- )
|
| |
- fedora_messaging.api.publish(msg)
|
| |
- messaging_tx_sent_ok_counter.labels(handler='resultsdb').inc()
|
| |
- except fedora_messaging.exceptions.PublishReturned as e:
|
| |
- log.warning(
|
| |
- 'Fedora Messaging broker rejected message %s: %s',
|
| |
- msg.id, e)
|
| |
- messaging_tx_stopped_counter.labels(handler='resultsdb').inc()
|
| |
- except fedora_messaging.exceptions.ConnectionException as e:
|
| |
- log.warning('Error sending message %s: %s', msg.id, e)
|
| |
- messaging_tx_failed_counter.labels(handler='resultsdb').inc()
|
| |
- except Exception: # pylint: disable=broad-except
|
| |
- log.exception('Error sending fedora-messaging message')
|
| |
- messaging_tx_failed_counter.labels(handler='resultsdb').inc()
|
| |
-
|
| |
- messaging_tx_stopped_counter.labels(handler='resultsdb').inc()
|
| |
+ self._publish_decision_change(
|
| |
+ submit_time=submit_time,
|
| |
+ subject_type=subject_type,
|
| |
+ subject_identifier=subject_identifier,
|
| |
+ testcase=testcase,
|
| |
+ product_version=product_version,
|
| |
+ publish_testcase=False,
|
| |
+ )
|
| |
This is lost. Intentional?