#427 Re-work monitor module incl. messaging counters
Merged 5 years ago by gnaponie. Opened 5 years ago by fivaldi.
fivaldi/greenwave fivaldi_monitor_module  into  master

@@ -0,0 +1,33 @@ 

+ # SPDX-License-Identifier: GPL-2.0+

+ 

+ import importlib

+ import os

+ import pytest

+ import requests

+ import greenwave.monitor

+ 

+ 

+ min_num_of_metrics = 23

+ 

+ 

+ def test_metrics(requests_session, greenwave_server):

+     r = requests_session.get(greenwave_server + 'api/v1.0/metrics')

+ 

+     assert r.status_code == 200

+     assert len([l for l in r.text.splitlines()

+                 if l.startswith('# TYPE')]) >= min_num_of_metrics

+ 

+ 

+ def test_standalone_metrics_server_disabled_by_default(requests_session):

+     with pytest.raises(requests.exceptions.ConnectionError):

+         requests_session.get('http://127.0.0.1:10040/metrics')

+ 

+ 

+ def test_standalone_metrics_server(requests_session):

+     os.environ['MONITOR_STANDALONE_METRICS_SERVER_ENABLE'] = 'true'

+     importlib.reload(greenwave.monitor)

+ 

+     r = requests_session.get('http://127.0.0.1:10040/metrics')

+ 

+     assert len([l for l in r.text.splitlines()

+                 if l.startswith('# TYPE')]) >= min_num_of_metrics

file modified
+1 -1
@@ -11,7 +11,7 @@ 

  from greenwave.resources import ResultsRetriever, retrieve_waivers

  from greenwave.safe_yaml import SafeYAMLError

  from greenwave.utils import insert_headers, jsonp

- from greenwave.monitoring import (

+ from greenwave.monitor import (

      registry,

      decision_exception_counter,

      decision_request_duration_seconds,

@@ -5,6 +5,7 @@ 

  

  from flask import Flask

  from greenwave.api_v1 import api

+ from greenwave.monitor import monitor_api

  from greenwave.utils import json_error, load_config, sha1_mangle_key

  from greenwave.policies import load_policies, RemoteRule

  
@@ -63,6 +64,7 @@ 

  

      # register blueprints

      app.register_blueprint(api, url_prefix="/api/v1.0")

+     app.register_blueprint(monitor_api, url_prefix="/api/v1.0")

What's the reason of adding another blueprint? Couldn't we just use the "api" one?
Is it just for keeping the 2 things separated?

The reason is to avoid "grafting" of the monitor module in different ways per each F2.0 project, as there are slight differences in the Flask API implementations. The blueprint way appears to be universal and simple. If the monitor module extends with other functionality, it's gonna work on most of F2.0 projects out of box.

      app.add_url_rule('/healthcheck', view_func=healthcheck)

  

      # Initialize the cache.

@@ -13,6 +13,9 @@ 

  

  from greenwave.consumers.resultsdb import ResultsDBHandler

  from greenwave.consumers.waiverdb import WaiverDBHandler

+ from greenwave.monitor import (

+     messaging_rx_counter, messaging_rx_ignored_counter,

What does "rx" stand for?

@gnaponie it means "received", but it may be worth renaming it

Exactly, "rx" means receive(d), "tx" means transmit(ted). They're common in communication terminology, transferring of data, packets, frames etc. From my point of view, these two very important abbreviations should not be forgotten. :-)

+     messaging_rx_processed_ok_counter, messaging_rx_failed_counter)

  

  from fedora_messaging.config import conf

  
@@ -41,9 +44,11 @@ 

      """

      log.info(

          'Received message from fedora-messaging with topic: %s', message.topic)

+     messaging_rx_counter.inc()

      consumer_config = conf["consumer_config"]

      if message.topic.endswith("taskotron.result.new"):

          # New resultsdb results

+         messaging_rx_counter.labels(handler="resultsdb").inc()

          config = {

              "topic_prefix": consumer_config["topic_prefix"],

              "environment": consumer_config["environment"],
@@ -53,10 +58,16 @@ 

          handler = ResultsDBHandler(hub)

          msg = {"body": {'msg': message.body}}

          log.info('Sending message received to: ResultsDBHandler')

-         handler.consume(msg)

+         try:

+             handler.consume(msg)

+             messaging_rx_processed_ok_counter.labels(handler="resultsdb").inc()

+         except Exception:

+             messaging_rx_failed_counter.labels(handler="resultsdb").inc()

+             raise

  

      elif message.topic.endswith('waiver.new'):

          # New waiver submitted

+         messaging_rx_counter.labels(handler="waiverdb").inc()

          config = {

              "topic_prefix": consumer_config["topic_prefix"],

              "environment": consumer_config["environment"],
@@ -66,4 +77,11 @@ 

          handler = WaiverDBHandler(hub)

          msg = {"body": {'msg': message.body}}

          log.info('Sending message received to: WaiverDBHandler')

-         handler.consume(msg)

+         try:

+             handler.consume(msg)

+             messaging_rx_processed_ok_counter.labels(handler="waiverdb").inc()

+         except Exception:

+             messaging_rx_failed_counter.labels(handler="waiverdb").inc()

+             raise

+ 

+     messaging_rx_ignored_counter.inc()

@@ -18,7 +18,10 @@ 

  import greenwave.app_factory

  import greenwave.resources

  from greenwave.api_v1 import subject_type_identifier_to_list

- from greenwave.monitoring import publish_decision_exceptions_result_counter

+ from greenwave.monitor import (

+     publish_decision_exceptions_result_counter,

+     messaging_tx_to_send_counter, messaging_tx_stopped_counter,

Same question for "tx" :)

@gnaponie it means "transmitted"

+     messaging_tx_sent_ok_counter, messaging_tx_failed_counter)

  from greenwave.policies import applicable_decision_context_product_version_pairs

  

  import xmlrpc.client
@@ -276,6 +279,7 @@ 

          log.info('Getting greenwave info')

  

          for decision_context, product_version in sorted(contexts_product_versions):

+             messaging_tx_to_send_counter.labels(handler='resultsdb').inc()

              greenwave_url = self.greenwave_api_url + '/decision'

  

              data = {
@@ -297,40 +301,54 @@ 

                  log.debug('old decision: %s', old_decision)

              except requests.exceptions.HTTPError as e:

                  log.exception('Failed to retrieve decision for data=%s, error: %s', data, e)

+                 messaging_tx_stopped_counter.labels(handler='resultsdb').inc()

                  continue

  

              if _is_decision_unchanged(old_decision, decision):

                  log.debug('Skipped emitting fedmsg, decision did not change: %s', decision)

-             else:

-                 decision.update({

-                     'subject_type': subject_type,

-                     'subject_identifier': subject_identifier,

-                     # subject is for backwards compatibility only:

-                     'subject': subject_type_identifier_to_list(subject_type,

-                                                                subject_identifier),

-                     'decision_context': decision_context,

-                     'product_version': product_version,

-                     'previous': old_decision,

-                 })

-                 log.info(

-                     'Emitted a message on the bus, %r, with the topic '

-                     '"greenwave.decision.update"', decision)

-                 if self.flask_app.config['MESSAGING'] == 'fedmsg':

-                     log.debug('  - to fedmsg')

+                 messaging_tx_stopped_counter.labels(handler='resultsdb').inc()

+                 continue

+ 

+             decision.update({

+                 'subject_type': subject_type,

+                 'subject_identifier': subject_identifier,

+                 # subject is for backwards compatibility only:

+                 'subject': subject_type_identifier_to_list(subject_type,

+                                                            subject_identifier),

+                 'decision_context': decision_context,

+                 'product_version': product_version,

+                 'previous': old_decision,

+             })

+             log.info(

+                 'Emitting a message on the bus, %r, with the topic '

+                 '"greenwave.decision.update"', decision)

+             if self.flask_app.config['MESSAGING'] == 'fedmsg':

+                 log.debug('  - to fedmsg')

+                 try:

                      fedmsg.publish(topic='decision.update', msg=decision)

-                 elif self.flask_app.config['MESSAGING'] == 'fedora-messaging':

-                     log.debug('  - to fedora-messaging')

-                     try:

-                         msg = fedora_messaging.api.Message(

-                             topic='greenwave.decision.update',

-                             body=decision

-                         )

-                         fedora_messaging.api.publish(msg)

-                     except fedora_messaging.exceptions.PublishReturned as e:

-                         log.warning(

-                             'Fedora Messaging broker rejected message %s: %s',

-                             msg.id, e)

-                     except fedora_messaging.exceptions.ConnectionException as e:

-                         log.warning('Error sending message %s: %s', msg.id, e)

-                     except Exception:  # pylint: disable=broad-except

-                         log.exception('Error sending fedora-messaging message')

+                     messaging_tx_sent_ok_counter.labels(handler='resultsdb').inc()

+                 except Exception:

+                     messaging_tx_failed_counter.labels(handler='resultsdb').inc()

+                     raise

+             elif self.flask_app.config['MESSAGING'] == 'fedora-messaging':

+                 log.debug('  - to fedora-messaging')

+                 try:

+                     msg = fedora_messaging.api.Message(

+                         topic='greenwave.decision.update',

+                         body=decision

+                     )

+                     fedora_messaging.api.publish(msg)

+                     messaging_tx_sent_ok_counter.labels(handler='resultsdb').inc()

+                 except fedora_messaging.exceptions.PublishReturned as e:

+                     log.warning(

+                         'Fedora Messaging broker rejected message %s: %s',

+                         msg.id, e)

+                     messaging_tx_stopped_counter.labels(handler='resultsdb').inc()

+                 except fedora_messaging.exceptions.ConnectionException as e:

+                     log.warning('Error sending message %s: %s', msg.id, e)

+                     messaging_tx_failed_counter.labels(handler='resultsdb').inc()

+                 except Exception:  # pylint: disable=broad-except

+                     log.exception('Error sending fedora-messaging message')

+                     messaging_tx_failed_counter.labels(handler='resultsdb').inc()

+ 

+             messaging_tx_stopped_counter.labels(handler='resultsdb').inc()

file modified
+54 -35
@@ -17,7 +17,10 @@ 

  

  import greenwave.app_factory

  from greenwave.api_v1 import subject_type_identifier_to_list

- from greenwave.monitoring import publish_decision_exceptions_waiver_counter

+ from greenwave.monitor import (

+     publish_decision_exceptions_waiver_counter,

+     messaging_tx_to_send_counter, messaging_tx_stopped_counter,

+     messaging_tx_sent_ok_counter, messaging_tx_failed_counter)

  from greenwave.policies import applicable_decision_context_product_version_pairs

  

  try:
@@ -98,6 +101,7 @@ 

              product_version=product_version)

  

          for decision_context, product_version in sorted(contexts_product_versions):

+             messaging_tx_to_send_counter.labels(handler='waiverdb').inc()

              data = {

                  'decision_context': decision_context,

                  'product_version': product_version,
@@ -111,6 +115,7 @@ 

  

              if not response.ok:

                  log.error(response.text)

+                 messaging_tx_stopped_counter.labels(handler='waiverdb').inc()

                  continue

  

              decision = response.json()
@@ -126,44 +131,58 @@ 

  

              if not response.ok:

                  log.error(response.text)

+                 messaging_tx_stopped_counter.labels(handler='waiverdb').inc()

                  continue

  

              old_decision = response.json()

  

              if decision == old_decision:

                  log.debug('Skipped emitting fedmsg, decision did not change: %s', decision)

-             else:

-                 msg = decision

-                 decision.update({

-                     'subject_type': subject_type,

-                     'subject_identifier': subject_identifier,

-                     # subject is for backwards compatibility only:

-                     'subject': subject_type_identifier_to_list(subject_type,

-                                                                subject_identifier),

-                     'testcase': testcase,

-                     'decision_context': decision_context,

-                     'product_version': product_version,

-                     'previous': old_decision,

-                 })

-                 log.info(

-                     'Emitted a message on the bus, %r, with the topic '

-                     '"greenwave.decision.update"', decision)

-                 if self.flask_app.config['MESSAGING'] == 'fedmsg':

-                     log.debug('  - to fedmsg')

+                 messaging_tx_stopped_counter.labels(handler='waiverdb').inc()

+                 continue

+ 

+             msg = decision

+             decision.update({

+                 'subject_type': subject_type,

+                 'subject_identifier': subject_identifier,

+                 # subject is for backwards compatibility only:

+                 'subject': subject_type_identifier_to_list(subject_type,

+                                                            subject_identifier),

+                 'testcase': testcase,

+                 'decision_context': decision_context,

+                 'product_version': product_version,

+                 'previous': old_decision,

+             })

+             log.info(

+                 'Emitting a message on the bus, %r, with the topic '

+                 '"greenwave.decision.update"', decision)

+             if self.flask_app.config['MESSAGING'] == 'fedmsg':

+                 log.debug('  - to fedmsg')

+                 try:

                      fedmsg.publish(topic='decision.update', msg=msg)

-                 elif self.flask_app.config['MESSAGING'] == 'fedora-message':

-                     log.debug('  - to fedora-messaging')

-                     try:

-                         msg = fedora_messaging.api.Message(

-                             topic='greenwave.decision.update',

-                             body=msg

-                         )

-                         fedora_messaging.api.publish(msg)

-                     except fedora_messaging.exceptions.PublishReturned as e:

-                         log.warning(

-                             'Fedora Messaging broker rejected message %s: %s',

-                             msg.id, e)

-                     except fedora_messaging.exceptions.ConnectionException as e:

-                         log.warning('Error sending message %s: %s', msg.id, e)

-                     except Exception:  # pylint: disable=broad-except

-                         log.exception('Error sending fedora-messaging message')

+                     messaging_tx_sent_ok_counter.labels(handler='waiverdb').inc()

+                 except Exception:

+                     messaging_tx_failed_counter.labels(handler='waiverdb').inc()

+                     raise

+             elif self.flask_app.config['MESSAGING'] == 'fedora-message':

+                 log.debug('  - to fedora-messaging')

+                 try:

+                     msg = fedora_messaging.api.Message(

+                         topic='greenwave.decision.update',

+                         body=msg

+                     )

+                     fedora_messaging.api.publish(msg)

+                     messaging_tx_sent_ok_counter.labels(handler='waiverdb').inc()

+                 except fedora_messaging.exceptions.PublishReturned as e:

+                     log.warning(

+                         'Fedora Messaging broker rejected message %s: %s',

+                         msg.id, e)

+                     messaging_tx_stopped_counter.labels(handler='waiverdb').inc()

+                 except fedora_messaging.exceptions.ConnectionException as e:

+                     log.warning('Error sending message %s: %s', msg.id, e)

+                     messaging_tx_failed_counter.labels(handler='waiverdb').inc()

+                 except Exception:  # pylint: disable=broad-except

+                     log.exception('Error sending fedora-messaging message')

+                     messaging_tx_failed_counter.labels(handler='waiverdb').inc()

+ 

+             messaging_tx_stopped_counter.labels(handler='waiverdb').inc()

file added
+98
@@ -0,0 +1,98 @@ 

+ # SPDX-License-Identifier: GPL-2.0+

+ 

+ # For an up-to-date version of this module, see:

+ #   https://pagure.io/monitor-flask-sqlalchemy

+ 

+ import os

+ import tempfile

+ 

+ from flask import Blueprint, Response

+ from prometheus_client import (  # noqa: F401

+     ProcessCollector, CollectorRegistry, Counter, multiprocess,

+     Histogram, generate_latest, start_http_server, CONTENT_TYPE_LATEST)

+ 

+ # Service-specific imports

+ 

+ 

+ if not os.environ.get('prometheus_multiproc_dir'):

+     os.environ.setdefault('prometheus_multiproc_dir', tempfile.mkdtemp())

+ registry = CollectorRegistry()

+ ProcessCollector(registry=registry)

+ multiprocess.MultiProcessCollector(registry)

+ if os.getenv('MONITOR_STANDALONE_METRICS_SERVER_ENABLE', 'false') == 'true':

+     port = os.getenv('MONITOR_STANDALONE_METRICS_SERVER_PORT', '10040')

+     start_http_server(int(port), registry=registry)

+ 

+ 

+ # Generic metrics

+ messaging_rx_counter = Counter(

+     'messaging_rx',

+     'Total number of messages received',

+     labelnames=['handler'],

+     registry=registry)

+ messaging_rx_ignored_counter = Counter(

+     'messaging_rx_ignored',

+     'Number of received messages, which were ignored',

+     registry=registry)

+ messaging_rx_processed_ok_counter = Counter(

+     'messaging_rx_processed_ok',

+     'Number of received messages, which were processed successfully',

+     labelnames=['handler'],

+     registry=registry)

+ messaging_rx_failed_counter = Counter(

+     'messaging_rx_failed',

+     'Number of received messages, which failed during processing',

+     labelnames=['handler'],

+     registry=registry)

+ 

+ messaging_tx_to_send_counter = Counter(

+     'messaging_tx_to_send',

+     'Total number of messages to send',

+     labelnames=['handler'],

+     registry=registry)

+ messaging_tx_stopped_counter = Counter(

+     'messaging_tx_stopped',

+     'Number of messages, which were eventually stopped before sending',

+     labelnames=['handler'],

+     registry=registry)

+ messaging_tx_sent_ok_counter = Counter(

+     'messaging_tx_sent_ok',

+     'Number of messages, which were sent successfully',

+     labelnames=['handler'],

+     registry=registry)

+ messaging_tx_failed_counter = Counter(

+     'messaging_tx_failed',

+     'Number of messages, for which the sender failed',

+     labelnames=['handler'],

+     registry=registry)

+ 

+ # Service-specific metrics

+ # https://github.com/prometheus/client_python/issues/210

+ # pylint: disable-msg=unexpected-keyword-arg,no-value-for-parameter

+ decision_exception_counter = Counter(

+     'total_decision_exceptions',

+     'All exceptions occurred in Greenwave "decision" API',

+     registry=registry)

+ decision_request_duration_seconds = Histogram(

+     'decision_request_duration_seconds',

+     'Decision latency',

+     registry=registry)

+ publish_decision_exceptions_waiver_counter = Counter(

+     'publish_decision_exceptions_new_waiver',

+     'All exceptions occurred in publishing a message after a new waiver',

+     registry=registry)

+ publish_decision_exceptions_result_counter = Counter(

+     'publish_decision_exceptions_new_result',

+     'All exceptions occurred in publishing a message after a new result',

+     registry=registry)

+ 

+ 

+ monitor_api = Blueprint(

+     'monitor', __name__,

+     url_prefix='')

+ 

+ 

+ @monitor_api.route('/metrics')

+ def metrics():

+     return Response(generate_latest(registry),

+                     content_type=CONTENT_TYPE_LATEST)

file removed
-30
@@ -1,30 +0,0 @@ 

- # SPDX-License-Identifier: GPL-2.0+

- 

- import os

- from prometheus_client import Counter, Histogram, multiprocess, CollectorRegistry

- 

- 

- # tmp dir for Prometheus monitoring registry.

- # Putting this here and not in the "create_app" function, because "create_app" imports the api

- # ...so the check for this env variable would be made before the "create_app" can be able to set it

- # This is executed only once at server starting, so it is not so bad for performace.

- if not os.environ.get('prometheus_multiproc_dir'):

-     os.environ.setdefault('prometheus_multiproc_dir', '/tmp')

- registry = CollectorRegistry()

- multiprocess.MultiProcessCollector(registry)

- # https://github.com/prometheus/client_python/issues/210

- # pylint: disable-msg=unexpected-keyword-arg,no-value-for-parameter

- decision_exception_counter = Counter('total_decision_exceptions', ('All exceptions occurred in '

-                                                                    'Greenwave "decision" API'),

-                                      registry=registry)

- decision_request_duration_seconds = Histogram('decision_request_duration_seconds',

-                                               'Decision latency',

-                                               registry=registry)

- publish_decision_exceptions_waiver_counter = Counter('publish_decision_exceptions_new_waiver',

-                                                      ('All exceptions occurred in publishing a '

-                                                       'message after a new waiver'),

-                                                      registry=registry)

- publish_decision_exceptions_result_counter = Counter('publish_decision_exceptions_new_result',

-                                                      ('All exceptions occurred in publishing a '

-                                                       'message after a new result'),

-                                                      registry=registry)

no initial comment

rebased onto 28e30261e353bf5a026cd9751f7b9bb31548fe64

5 years ago

rebased onto 5eefffa

5 years ago

@gnaponie Thanks. Checking the number of metrics appears to be different for different environments, depending mainly on prometheus_client library version. I changed it to min_num_of_metrics which should work now. For future, I need to think of a better, yet simple way of testing the metrics.

Hello fivaldi. Sorry for the late reply.
It seems still not to work for me :(
Could we find a better way to be sure this will always work?
These are my errors:
https://paste.fedoraproject.org/paste/avtKU~8NX2bK3t61mM2tqA

@gnaponie Seems like you're not testing against the latest revision in the branch: 5eefffa

Otherwise it would pass for you, because there's the >= (greater than or equals) in the test(s).

Yeah, you're right. Sorry about that. They work now, thanks. I'll make another review now.

What's the reason of adding another blueprint? Couldn't we just use the "api" one?
Is it just for keeping the 2 things separated?

Beside those comments (I'm just curios about them), the PR looks fine!

@gnaponie it means "received", but it may be worth renaming it

The reason is to avoid "grafting" of the monitor module in different ways per each F2.0 project, as there are slight differences in the Flask API implementations. The blueprint way appears to be universal and simple. If the monitor module extends with other functionality, it's gonna work on most of F2.0 projects out of box.

Exactly, "rx" means receive(d), "tx" means transmit(ted). They're common in communication terminology, transferring of data, packets, frames etc. From my point of view, these two very important abbreviations should not be forgotten. :-)

It looks good. I think we can merge it!

Commit 7508692 fixes this pull-request

Pull-Request has been merged by gnaponie

5 years ago

Pull-Request has been merged by gnaponie

5 years ago