| |
@@ -22,12 +22,23 @@
|
| |
#
|
| |
# Written by Jan Kaluza <jkaluza@redhat.com>
|
| |
|
| |
+ import time
|
| |
+ import threading
|
| |
from sqlalchemy import func
|
| |
- from prometheus_client import CollectorRegistry
|
| |
+ from prometheus_client import CollectorRegistry, Gauge
|
| |
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily
|
| |
|
| |
from odcs.common.types import COMPOSE_STATES, PUNGI_SOURCE_TYPE_NAMES
|
| |
from odcs.server.models import Compose
|
| |
+ from odcs.server import log, conf, app
|
| |
+
|
| |
+ try:
|
| |
+ from odcs.server.celery_tasks import celery_app
|
| |
+ from celery.utils.objects import FallbackContext
|
| |
+ import amqp.exceptions
|
| |
+ CELERY_AVAILABLE = True
|
| |
+ except ImportError:
|
| |
+ CELERY_AVAILABLE = False
|
| |
|
| |
|
| |
registry = CollectorRegistry()
|
| |
@@ -93,4 +104,113 @@
|
| |
yield self.raw_config_composes_count()
|
| |
|
| |
|
| |
+ class WorkerCountThread(threading.Thread):
|
| |
+ """
|
| |
+ Thread providing and updating following metrics:
|
| |
+
|
| |
+ - celery_workers_totals - Number of alive workers.
|
| |
+ - celery_workers[worker_name] - 1 if worker is online, 0 if offline.
|
| |
+ """
|
| |
+ def __init__(self, registry=None):
|
| |
+ super(WorkerCountThread, self).__init__()
|
| |
+ self.daemon = True
|
| |
+ self.workers_total = Gauge(
|
| |
+ "celery_workers_totals", "Number of alive workers", registry=registry
|
| |
+ )
|
| |
+ self.workers_total.set(0)
|
| |
+ self.workers = Gauge(
|
| |
+ "celery_workers", "Number of alive workers", ["worker_name"],
|
| |
+ registry=registry
|
| |
+ )
|
| |
+ self.worker_names = set()
|
| |
+
|
| |
+ def update_metrics(self):
|
| |
+ log.info("[metrics] Getting number of workers.")
|
| |
+ try:
|
| |
+ celery_ping = celery_app.control.ping(timeout=15)
|
| |
+ except Exception: # pragma: no cover
|
| |
+ log.exception("[metrics] Error pinging workers.")
|
| |
+ return
|
| |
+
|
| |
+ # Set total number of workers.
|
| |
+ self.workers_total.set(len(celery_ping))
|
| |
+
|
| |
+ # Set all known workers to 0 to mark them offline.
|
| |
+ for workers in celery_ping:
|
| |
+ self.worker_names |= set(workers.keys())
|
| |
+ for worker_name in self.worker_names:
|
| |
+ self.workers.labels(worker_name).set(0)
|
| |
+
|
| |
+ # Set online workers to 1.
|
| |
+ for workers in celery_ping:
|
| |
+ for worker_name in workers.keys():
|
| |
+ self.workers.labels(worker_name).set(1)
|
| |
+
|
| |
+ def run(self):
|
| |
+ while True:
|
| |
+ self.update_metrics()
|
| |
+ time.sleep(30)
|
| |
+
|
| |
+
|
| |
+ class QueueLengthThread(threading.Thread):
|
| |
+ """
|
| |
+ Thread providing and updating following metrics:
|
| |
+
|
| |
+ - celery_queue_length[queue_name] - Number of tasks waiting in the queue.
|
| |
+ """
|
| |
+ def __init__(self, registry=None):
|
| |
+ super(QueueLengthThread, self).__init__()
|
| |
+ self.daemon = True
|
| |
+ self.queue_length = Gauge(
|
| |
+ "celery_queue_length", "Number of tasks in the queue.",
|
| |
+ ["queue_name"], registry=registry
|
| |
+ )
|
| |
+
|
| |
+ # Get all the possible queue names from the config.
|
| |
+ self.queues = [conf.celery_cleanup_queue]
|
| |
+ for rules in conf.celery_router_config["routing_rules"].values():
|
| |
+ self.queues += rules.keys()
|
| |
+ # Initialize the queue length to 0.
|
| |
+ for queue in self.queues:
|
| |
+ self.queue_length.labels(queue).set(0)
|
| |
+
|
| |
+ # Get the Celery connetion.
|
| |
+ self.connection = celery_app.connection_or_acquire()
|
| |
+ if isinstance(self.connection, FallbackContext):
|
| |
+ self.connection = self.connection.fallback()
|
| |
+
|
| |
+ def update_metrics(self):
|
| |
+ for queue in self.queues:
|
| |
+ try:
|
| |
+ log.info("[metrics] Getting %s queue length." % queue)
|
| |
+ length = self.connection.default_channel.queue_declare(
|
| |
+ queue=queue, passive=True
|
| |
+ ).message_count
|
| |
+ self.queue_length.labels(queue).set(length)
|
| |
+ except amqp.exceptions.ChannelError:
|
| |
+ # Queue not created yet.
|
| |
+ pass
|
| |
+ except Exception: # pragma: no cover
|
| |
+ log.exception("[metrics] Error getting queue length.")
|
| |
+
|
| |
+ def run(self):
|
| |
+ while True:
|
| |
+ self.update_metrics()
|
| |
+ time.sleep(30)
|
| |
+
|
| |
+
|
| |
registry.register(ComposesCollector())
|
| |
+
|
| |
+
|
| |
+ # Start the Celery metrics only on Frontend using the `before_first_request` decorator.
|
| |
+ @app.before_first_request
|
| |
+ def start_celery_metrics():
|
| |
+ if CELERY_AVAILABLE:
|
| |
+ # These threads are "daemonic". This means they are stopped automatically
|
| |
+ # by Python when main Python thread is stopped. There is no need to .join()
|
| |
+ # and since they are only updating metrics, they do not have to end up
|
| |
+ # gracefully and can just be "killed" by Python.
|
| |
+ worker_count_thread = WorkerCountThread(registry=registry)
|
| |
+ worker_count_thread.start()
|
| |
+ queue_length_thread = QueueLengthThread(registry=registry)
|
| |
+ queue_length_thread.start()
|
| |
Signed-off-by: Jan Kaluza jkaluza@redhat.com