From 5265247cb3130ff3452747ccf177b5bffc43ff02 Mon Sep 17 00:00:00 2001 From: Giulia Naponiello Date: Feb 07 2019 12:32:52 +0000 Subject: Merge #274 `Basic monitoring` --- diff --git a/requirements.txt b/requirements.txt index 5c6e48e..a16a131 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,7 @@ pytest >= 2.4.2 mock stomp.py Flask-Migrate +six # Documentation requirements sphinx @@ -26,3 +27,6 @@ requests-gssapi # Database psycopg2-binary + +# Monitoring +prometheus_client diff --git a/tests/conftest.py b/tests/conftest.py index e2ba999..d9d07a5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,6 +5,7 @@ from copy import copy import pytest from sqlalchemy import create_engine from waiverdb.app import create_app +from waiverdb.monitor import db_hook_event_listeners @pytest.fixture(scope='session') @@ -36,6 +37,7 @@ def db(app): connection.execute('DROP DATABASE IF EXISTS {}'.format(dbname)) connection.execute('CREATE DATABASE {}'.format(dbname)) db.create_all() + db_hook_event_listeners() return db diff --git a/tests/test_monitor.py b/tests/test_monitor.py new file mode 100644 index 0000000..19d0c6b --- /dev/null +++ b/tests/test_monitor.py @@ -0,0 +1,39 @@ +# SPDX-License-Identifier: GPL-2.0+ + +import os +import pytest +import requests +import waiverdb.monitor + +from six.moves import reload_module + + +def test_metrics(client): + r = client.get('/api/v1.0/metrics') + + assert r.status_code == 200 + assert len([l for l in r.get_data(as_text=True).splitlines() + if l.startswith('# TYPE messaging_') and + l.endswith(' counter')]) == 4 + assert len([l for l in r.get_data(as_text=True).splitlines() + if l.startswith('# TYPE db_') and + l.endswith(' counter')]) == 4 + + +def test_standalone_metrics_server_disabled_by_default(): + with pytest.raises(requests.exceptions.ConnectionError): + requests.get('http://127.0.0.1:10040/metrics') + + +def test_standalone_metrics_server(): + os.environ['MONITOR_STANDALONE_METRICS_SERVER_ENABLE'] = 'true' + reload_module(waiverdb.monitor) + + r = requests.get('http://127.0.0.1:10040/metrics') + + assert len([l for l in r.text.splitlines() + if l.startswith('# TYPE messaging_') and + l.endswith(' counter')]) == 4 + assert len([l for l in r.text.splitlines() + if l.startswith('# TYPE db_') and + l.endswith(' counter')]) == 4 diff --git a/tox.ini b/tox.ini index 963f2ae..1f0089f 100644 --- a/tox.ini +++ b/tox.ini @@ -15,7 +15,10 @@ whitelist_externals = find commands = find -name *.pyc -delete - py.test tests/ + py.test {posargs} + +[pytest] +testpaths = tests/ [testenv:docs] changedir = docs diff --git a/waiverdb/api_v1.py b/waiverdb/api_v1.py index 771753e..f3c1ebd 100644 --- a/waiverdb/api_v1.py +++ b/waiverdb/api_v1.py @@ -607,9 +607,16 @@ class AboutResource(Resource): return {'version': __version__, 'auth_method': current_app.config['AUTH_METHOD']} +class MonitorResource(Resource): + def get(self): + from waiverdb.monitor import MonitorAPI + return MonitorAPI().get() + + # set up the Api resource routing here api.add_resource(WaiversResource, '/waivers/') api.add_resource(WaiverResource, '/waivers/') api.add_resource(FilteredWaiversResource, '/waivers/+filtered') api.add_resource(GetWaiversBySubjectsAndTestcases, '/waivers/+by-subjects-and-testcases') api.add_resource(AboutResource, '/about') +api.add_resource(MonitorResource, '/metrics') diff --git a/waiverdb/app.py b/waiverdb/app.py index f1546e2..b2181b1 100644 --- a/waiverdb/app.py +++ b/waiverdb/app.py @@ -20,6 +20,7 @@ from waiverdb.models import db from waiverdb.utils import json_error from flask_oidc import OpenIDConnect from werkzeug.exceptions import default_exceptions +from waiverdb.monitor import db_hook_event_listeners def load_config(app): @@ -108,6 +109,9 @@ def create_app(config_obj=None): app.after_request(insert_headers) + # initialize DB event listeners from the monitor module + app.before_first_request(db_hook_event_listeners) + return app diff --git a/waiverdb/events.py b/waiverdb/events.py index e0ef739..f9b521e 100644 --- a/waiverdb/events.py +++ b/waiverdb/events.py @@ -15,10 +15,12 @@ from flask_restful import marshal import fedmsg import stomp import json +import waiverdb.monitor as monitor + +from flask import current_app from waiverdb.fields import waiver_fields from waiverdb.models import Waiver from waiverdb.utils import stomp_connection -from flask import current_app _log = logging.getLogger(__name__) @@ -61,23 +63,45 @@ def publish_new_waiver(session): """ _log.debug('The publish_new_waiver SQLAlchemy event has been activated (%r)', current_app.config['MESSAGE_PUBLISHER']) + if current_app.config['MESSAGE_PUBLISHER'] == 'stomp': with stomp_connection() as conn: stomp_configs = current_app.config.get('STOMP_CONFIGS') for row in session.identity_map.values(): - if isinstance(row, Waiver): - _log.debug('Publishing a message for %r', row) - msg = json.dumps(marshal(row, waiver_fields)) - kwargs = dict(body=msg, headers={}, destination=stomp_configs['destination']) - if stomp.__version__[0] < 4: - kwargs['message'] = kwargs.pop('body') # On EL7, different sig. + monitor.messaging_tx_to_send_counter.inc() + if not isinstance(row, Waiver): + continue + _log.debug('Publishing a message for %r', row) + msg = json.dumps(marshal(row, waiver_fields)) + kwargs = dict(body=msg, headers={}, destination=stomp_configs['destination']) + if stomp.__version__[0] < 4: + kwargs['message'] = kwargs.pop('body') # On EL7, different sig. + try: conn.send(**kwargs) + monitor.messaging_tx_sent_ok_counter.inc() + except Exception: + _log.exception('Couldn\'t publish message via stomp') + monitor.messaging_tx_failed_counter.inc() + raise + elif current_app.config['MESSAGE_PUBLISHER'] == 'fedmsg': for row in session.identity_map.values(): - if isinstance(row, Waiver): - _log.debug('Publishing a message for %r', row) + monitor.messaging_tx_to_send_counter.inc() + if not isinstance(row, Waiver): + continue + _log.debug('Publishing a message for %r', row) + try: fedmsg.publish(topic='waiver.new', msg=marshal(row, waiver_fields)) + monitor.messaging_tx_sent_ok_counter.inc() + except Exception: + _log.exception('Couldn\'t publish message via fedmsg') + monitor.messaging_tx_failed_counter.inc() + raise + elif current_app.config['MESSAGE_PUBLISHER'] is None: _log.info('No message published. MESSAGE_PUBLISHER disabled.') + monitor.messaging_tx_stopped_counter.inc() + else: _log.warning('Unhandled MESSAGE_PUBLISHER %r', current_app.config['MESSAGE_PUBLISHER']) + monitor.messaging_tx_failed_counter.inc() diff --git a/waiverdb/monitor.py b/waiverdb/monitor.py new file mode 100644 index 0000000..e346e9e --- /dev/null +++ b/waiverdb/monitor.py @@ -0,0 +1,97 @@ +# SPDX-License-Identifier: GPL-2.0+ + +# For an up-to-date version of this module, see: +# https://pagure.io/monitor-flask-sqlalchemy + +# pylint: disable=W,unexpected-keyword-arg,no-value-for-parameter + +import os +import tempfile + +from flask import Response +from flask.views import MethodView +from prometheus_client import ( # noqa: F401 + ProcessCollector, CollectorRegistry, Counter, multiprocess, + Histogram, generate_latest, start_http_server, CONTENT_TYPE_LATEST) +from sqlalchemy import event + +# Service-specific imports + + +if not os.environ.get('prometheus_multiproc_dir'): + os.environ.setdefault('prometheus_multiproc_dir', tempfile.mkdtemp()) +registry = CollectorRegistry() +ProcessCollector(registry=registry) +multiprocess.MultiProcessCollector(registry) +if os.getenv('MONITOR_STANDALONE_METRICS_SERVER_ENABLE', 'false') == 'true': + port = os.getenv('MONITOR_STANDALONE_METRICS_SERVER_PORT', '10040') + start_http_server(int(port), registry=registry) + + +# Generic metrics +messaging_tx_to_send_counter = Counter( + 'messaging_tx_to_send', + 'Total number of messages to send', + registry=registry) +messaging_tx_stopped_counter = Counter( + 'messaging_tx_stopped', + 'Number of messages, which were eventually stopped before sending', + registry=registry) +messaging_tx_sent_ok_counter = Counter( + 'messaging_tx_sent_ok', + 'Number of messages, which were sent successfully', + registry=registry) +messaging_tx_failed_counter = Counter( + 'messaging_tx_failed', + 'Number of messages, for which the sender failed', + registry=registry) + +db_dbapi_error_counter = Counter( + 'db_dbapi_error', + 'Number of DBAPI errors', + registry=registry) +db_engine_connect_counter = Counter( + 'db_engine_connect', + 'Number of \'engine_connect\' events', + registry=registry) +db_handle_error_counter = Counter( + 'db_handle_error', + 'Number of exceptions during connection', + registry=registry) +db_transaction_rollback_counter = Counter( + 'db_transaction_rollback', + 'Number of transactions, which were rolled back', + registry=registry) + +# Service-specific metrics +# XXX: TODO + + +def db_hook_event_listeners(target=None): + # Service-specific import of db + from waiverdb.models import db + + if not target: + target = db.engine + + @event.listens_for(target, 'dbapi_error', named=True) + def receive_dbapi_error(**kw): + db_dbapi_error_counter.inc() + + @event.listens_for(target, 'engine_connect') + def receive_engine_connect(conn, branch): + db_engine_connect_counter.inc() + + @event.listens_for(target, 'handle_error') + def receive_handle_error(exception_context): + db_handle_error_counter.inc() + + @event.listens_for(target, 'rollback') + def receive_rollback(conn): + db_transaction_rollback_counter.inc() + + +class MonitorAPI(MethodView): + def get(self): + return Response(generate_latest(registry), + content_type=CONTENT_TYPE_LATEST)