#414 Add metrics for Celery workers and queues.
Merged 3 years ago by lsedlar. Opened 3 years ago by jkaluza.
jkaluza/odcs metrics  into  master

file modified
+121 -1
@@ -22,12 +22,23 @@ 

  #

  # Written by Jan Kaluza <jkaluza@redhat.com>

  

+ import time

+ import threading

  from sqlalchemy import func

- from prometheus_client import CollectorRegistry

+ from prometheus_client import CollectorRegistry, Gauge

  from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily

  

  from odcs.common.types import COMPOSE_STATES, PUNGI_SOURCE_TYPE_NAMES

  from odcs.server.models import Compose

+ from odcs.server import log, conf, app

+ 

+ try:

+     from odcs.server.celery_tasks import celery_app

+     from celery.utils.objects import FallbackContext

+     import amqp.exceptions

+     CELERY_AVAILABLE = True

+ except ImportError:

+     CELERY_AVAILABLE = False

  

  

  registry = CollectorRegistry()
@@ -93,4 +104,113 @@ 

          yield self.raw_config_composes_count()

  

  

+ class WorkerCountThread(threading.Thread):

+     """

+     Thread providing and updating following metrics:

+ 

+     - celery_workers_totals - Number of alive workers.

+     - celery_workers[worker_name] - 1 if worker is online, 0 if offline.

+     """

+     def __init__(self, registry=None):

+         super(WorkerCountThread, self).__init__()

+         self.daemon = True

+         self.workers_total = Gauge(

+             "celery_workers_totals", "Number of alive workers", registry=registry

+         )

+         self.workers_total.set(0)

+         self.workers = Gauge(

+             "celery_workers", "Number of alive workers", ["worker_name"],

+             registry=registry

+         )

+         self.worker_names = set()

+ 

+     def update_metrics(self):

+         log.info("[metrics] Getting number of workers.")

+         try:

+             celery_ping = celery_app.control.ping(timeout=15)

+         except Exception:  # pragma: no cover

+             log.exception("[metrics] Error pinging workers.")

+             return

+ 

+         # Set total number of workers.

+         self.workers_total.set(len(celery_ping))

+ 

+         # Set all known workers to 0 to mark them offline.

+         for workers in celery_ping:

+             self.worker_names |= set(workers.keys())

+         for worker_name in self.worker_names:

+             self.workers.labels(worker_name).set(0)

+ 

+         # Set online workers to 1.

+         for workers in celery_ping:

+             for worker_name in workers.keys():

+                 self.workers.labels(worker_name).set(1)

+ 

+     def run(self):

+         while True:

+             self.update_metrics()

+             time.sleep(30)

+ 

+ 

+ class QueueLengthThread(threading.Thread):

+     """

+     Thread providing and updating following metrics:

+ 

+     - celery_queue_length[queue_name] - Number of tasks waiting in the queue.

+     """

+     def __init__(self, registry=None):

+         super(QueueLengthThread, self).__init__()

+         self.daemon = True

+         self.queue_length = Gauge(

+             "celery_queue_length", "Number of tasks in the queue.",

+             ["queue_name"], registry=registry

+         )

+ 

+         # Get all the possible queue names from the config.

+         self.queues = [conf.celery_cleanup_queue]

+         for rules in conf.celery_router_config["routing_rules"].values():

+             self.queues += rules.keys()

+         # Initialize the queue length to 0.

+         for queue in self.queues:

+             self.queue_length.labels(queue).set(0)

+ 

+         # Get the Celery connetion.

+         self.connection = celery_app.connection_or_acquire()

+         if isinstance(self.connection, FallbackContext):

+             self.connection = self.connection.fallback()

+ 

+     def update_metrics(self):

+         for queue in self.queues:

+             try:

+                 log.info("[metrics] Getting %s queue length." % queue)

+                 length = self.connection.default_channel.queue_declare(

+                     queue=queue, passive=True

+                 ).message_count

+                 self.queue_length.labels(queue).set(length)

+             except amqp.exceptions.ChannelError:

+                 # Queue not created yet.

+                 pass

+             except Exception:  # pragma: no cover

+                 log.exception("[metrics] Error getting queue length.")

+ 

+     def run(self):

+         while True:

+             self.update_metrics()

+             time.sleep(30)

+ 

+ 

  registry.register(ComposesCollector())

+ 

+ 

+ # Start the Celery metrics only on Frontend using the `before_first_request` decorator.

+ @app.before_first_request

+ def start_celery_metrics():

+     if CELERY_AVAILABLE:

+         # These threads are "daemonic". This means they are stopped automatically

+         # by Python when main Python thread is stopped. There is no need to .join()

+         # and since they are only updating metrics, they do not have to end up

+         # gracefully and can just be "killed" by Python.

+         worker_count_thread = WorkerCountThread(registry=registry)

+         worker_count_thread.start()

+         queue_length_thread = QueueLengthThread(registry=registry)

+         queue_length_thread.start()

file modified
+61 -1
@@ -20,11 +20,15 @@ 

  #

  # Written by Jan Kaluza <jkaluza@redhat.com>

  

+ from mock import patch

+ 

  from odcs.server import db

  from odcs.server.models import Compose

  from odcs.common.types import COMPOSE_RESULTS

  from odcs.server.pungi import PungiSourceType

- from odcs.server.metrics import ComposesCollector

+ from odcs.server.metrics import (

+     ComposesCollector, QueueLengthThread, WorkerCountThread

+ )

  from .utils import ModelsBaseTest

  

  
@@ -69,3 +73,59 @@ 

                  self.assertEqual(sample.value, 15)

              elif sample.labels["source"] == "foo#other_commits_or_branches":

                  self.assertEqual(sample.value, 10)

+ 

+ 

+ class TestQueueLengthThread(ModelsBaseTest):

+ 

+     @patch("odcs.server.metrics.celery_app")

+     def test_update_metrics(self, celery_app):

+         conn = celery_app.connection_or_acquire.return_value

+         queue_declare = conn.default_channel.queue_declare

+         queue_declare.return_value.message_count = 10

+ 

+         thread = QueueLengthThread()

+         thread.update_metrics()

+         metrics = thread.queue_length.collect()

+         for metric in metrics:

+             queues = set()

+             for sample in metric.samples:

+                 queues.add(sample.labels["queue_name"])

+                 self.assertEqual(sample.value, 10)

+             self.assertEqual(queues, set(["cleanup", "pungi_composes", "pulp_composes"]))

+ 

+ 

+ class TestWorkerCountThread(ModelsBaseTest):

+ 

+     @patch("odcs.server.metrics.celery_app")

+     def test_update_metrics(self, celery_app):

+         celery_app.control.ping.side_effect = [

+             [

+                 {"worker-1@localhost": {"ok": "pong"}},

+                 {"worker-2@localhost": {"ok": "pong"}}

+             ],

+             [

+                 {"worker-1@localhost": {"ok": "pong"}}

+             ],

+         ]

+ 

+         # Both workers online.

+         thread = WorkerCountThread()

+         thread.update_metrics()

+         metrics = thread.workers_total.collect()

+         self.assertEqual(metrics[0].samples[0].value, 2)

+         metrics = thread.workers.collect()

+         for metric in metrics:

+             for sample in metric.samples:

+                 self.assertEqual(sample.value, 1)

+ 

+         # The worker-2 went offline.

+         thread.update_metrics()

+         metrics = thread.workers_total.collect()

+         self.assertEqual(metrics[0].samples[0].value, 1)

+         metrics = thread.workers.collect()

+         for metric in metrics:

+             for sample in metric.samples:

+                 if sample.labels["worker_name"] == "worker-1@localhost":

+                     self.assertEqual(sample.value, 1)

+                 else:

+                     self.assertEqual(sample.value, 0)