From cdeaa32ad2e9885e8e2620c2b8149e1459857922 Mon Sep 17 00:00:00 2001 From: Lubomír Sedlář Date: Aug 02 2023 06:14:58 +0000 Subject: Merge all metrics into a single thread All of the background collector threads were running on the same schedule. There's no benefit to actually use four threads when a single one would do just fine. --- diff --git a/server/odcs/server/metrics.py b/server/odcs/server/metrics.py index 5f83c0d..3f65eb0 100644 --- a/server/odcs/server/metrics.py +++ b/server/odcs/server/metrics.py @@ -48,17 +48,19 @@ except ImportError: registry = CollectorRegistry() -class WorkerCountThread(threading.Thread): +class MetricsCollectorThread(threading.Thread): """ Thread providing and updating following metrics: - celery_workers_expected - Number of expected workers. - celery_workers_totals - Number of alive workers. - celery_workers[worker_name] - 1 if worker is online, 0 if offline. + - celery_queue_length[queue_name] - Number of tasks waiting in the queue. + - celery_queue_worker[queue_name] - Number of workers consume tasks from the queue. """ def __init__(self, registry=None): - super(WorkerCountThread, self).__init__() + super(MetricsCollectorThread, self).__init__() self.daemon = True self.workers_expected = Gauge( "celery_workers_expected", "Number of expected workers", registry=registry @@ -78,7 +80,55 @@ class WorkerCountThread(threading.Thread): ) self.worker_names = set() - def update_metrics(self): + self.queue_length = Gauge( + "celery_queue_length", + "Number of tasks in the queue.", + ["queue_name"], + registry=registry, + ) + + # Get all the possible queue names from the config. + self.queues = [conf.celery_cleanup_queue] + for rules in conf.celery_router_config["routing_rules"].values(): + self.queues += rules.keys() + # Initialize the queue length to 0. + for queue in self.queues: + self.queue_length.labels(queue).set(0) + + # Get the Celery connetion. + self.connection = celery_app.connection_or_acquire() + if isinstance(self.connection, FallbackContext): + self.connection = self.connection.fallback() + + self.queue_worker = Gauge( + "celery_queue_worker", + "Number of workers consume tasks from the queue.", + ["queue_name"], + registry=registry, + ) + + self.composes_total = Gauge( + "composes_total", + "Total number of composes", + ["source_type", "state"], + registry=registry, + ) + self.raw_config_composes_count_data = {} + self.raw_config_composes_count = Counter( + "raw_config_composes_count", + "Total number of raw_config composes per source", + ["source"], + registry=registry, + ) + self.raw_config_composes_data = {} + self.raw_config_composes = Counter( + "raw_config_composes", + "State with count", + ["source", "state"], + registry=registry, + ) + + def update_worker_metrics(self): log.info("[metrics] Getting number of workers.") try: celery_ping = celery_app.control.ping(timeout=15) @@ -102,41 +152,14 @@ class WorkerCountThread(threading.Thread): def run(self): while True: - self.update_metrics() + self.update_compose_metrics() + if CELERY_AVAILABLE: + self.update_worker_metrics() + self.update_queue_metrics() + self.update_queue_worker_metrics() time.sleep(30) - -class QueueLengthThread(threading.Thread): - """ - Thread providing and updating following metrics: - - - celery_queue_length[queue_name] - Number of tasks waiting in the queue. - """ - - def __init__(self, registry=None): - super(QueueLengthThread, self).__init__() - self.daemon = True - self.queue_length = Gauge( - "celery_queue_length", - "Number of tasks in the queue.", - ["queue_name"], - registry=registry, - ) - - # Get all the possible queue names from the config. - self.queues = [conf.celery_cleanup_queue] - for rules in conf.celery_router_config["routing_rules"].values(): - self.queues += rules.keys() - # Initialize the queue length to 0. - for queue in self.queues: - self.queue_length.labels(queue).set(0) - - # Get the Celery connetion. - self.connection = celery_app.connection_or_acquire() - if isinstance(self.connection, FallbackContext): - self.connection = self.connection.fallback() - - def update_metrics(self): + def update_queue_metrics(self): for queue in self.queues: try: log.info("[metrics] Getting %s queue length." % queue) @@ -150,40 +173,7 @@ class QueueLengthThread(threading.Thread): except Exception: # pragma: no cover log.exception("[metrics] Error getting queue length.") - def run(self): - while True: - self.update_metrics() - time.sleep(30) - - -class QueueWorkerThread(threading.Thread): - """ - Thread providing and updating following metrics: - - - celery_queue_worker[queue_name] - Number of workers consume tasks from the queue. - """ - - def __init__(self, registry=None): - super(QueueWorkerThread, self).__init__() - self.daemon = True - self.queue_worker = Gauge( - "celery_queue_worker", - "Number of workers consume tasks from the queue.", - ["queue_name"], - registry=registry, - ) - - # Get all the possible queue names from the config. - self.queues = [conf.celery_cleanup_queue] - for rules in conf.celery_router_config["routing_rules"].values(): - self.queues += rules.keys() - - # Get the Celery connetion. - self.connection = celery_app.connection_or_acquire() - if isinstance(self.connection, FallbackContext): - self.connection = self.connection.fallback() - - def update_metrics(self): + def update_queue_worker_metrics(self): log.info("[metrics] Getting queue worker number.") try: active_queues = celery_app.control.inspect().active_queues() @@ -199,38 +189,6 @@ class QueueWorkerThread(threading.Thread): for q in queues: self.queue_worker.labels(q["name"]).inc() - def run(self): - while True: - self.update_metrics() - time.sleep(30) - - -class ComposeCollectorThread(threading.Thread): - def __init__(self, registry=None): - super(ComposeCollectorThread, self).__init__() - self.daemon = True - - self.composes_total = Gauge( - "composes_total", - "Total number of composes", - ["source_type", "state"], - registry=registry, - ) - self.raw_config_composes_count_data = {} - self.raw_config_composes_count = Counter( - "raw_config_composes_count", - "Total number of raw_config composes per source", - ["source"], - registry=registry, - ) - self.raw_config_composes_data = {} - self.raw_config_composes = Counter( - "raw_config_composes", - "State with count", - ["source", "state"], - registry=registry, - ) - def update_composes_total(self): """ Updates `composes_total` metric with number of composes for each state @@ -314,32 +272,19 @@ class ComposeCollectorThread(threading.Thread): self.raw_config_composes_data[(source, "removed")] = count self.raw_config_composes.labels(source, "removed").inc(increment) - def update_metrics(self): + def update_compose_metrics(self): log.info("[metrics] Getting compose counts.") self.update_composes_total() self.update_raw_config_composes_count() self.update_raw_config_types() - def run(self): - while True: - self.update_metrics() - time.sleep(30) - # Start the Celery metrics only on Frontend using the `before_first_request` decorator. @app.before_first_request def start_celery_metrics(): - # These threads are "daemonic". This means they are stopped automatically - # by Python when main Python thread is stopped. There is no need to .join() - # and since they are only updating metrics, they do not have to end up + # This thread is "daemonic". This means it is stopped automatically by + # Python when main Python thread is stopped. There is no need to .join() + # and since it is only updating metrics, it does not have to end up # gracefully and can just be "killed" by Python. - composes_collector_thread = ComposeCollectorThread(registry=registry) - composes_collector_thread.start() - - if CELERY_AVAILABLE: - worker_count_thread = WorkerCountThread(registry=registry) - worker_count_thread.start() - queue_length_thread = QueueLengthThread(registry=registry) - queue_length_thread.start() - queue_worker_thread = QueueWorkerThread(registry=registry) - queue_worker_thread.start() + metrics_collector_thread = MetricsCollectorThread(registry=registry) + metrics_collector_thread.start() diff --git a/server/tests/test_metrics.py b/server/tests/test_metrics.py index 9dd33d4..f083b00 100644 --- a/server/tests/test_metrics.py +++ b/server/tests/test_metrics.py @@ -26,18 +26,14 @@ from odcs.server import db from odcs.server.models import Compose from odcs.common.types import COMPOSE_RESULTS from odcs.server.pungi import PungiSourceType -from odcs.server.metrics import ( - ComposeCollectorThread, - QueueLengthThread, - WorkerCountThread, -) +from odcs.server.metrics import MetricsCollectorThread from .utils import ModelsBaseTest class TestComposesCollector(ModelsBaseTest): def setUp(self): super(TestComposesCollector, self).setUp() - self.thread = ComposeCollectorThread() + self.thread = MetricsCollectorThread() def test_composes_total(self): Compose.create( @@ -58,7 +54,7 @@ class TestComposesCollector(ModelsBaseTest): ) db.session.commit() - self.thread.update_metrics() + self.thread.update_compose_metrics() metrics = self.thread.raw_config_composes_count.collect() for sample in metrics[0].samples: if ( @@ -89,7 +85,7 @@ class TestComposesCollector(ModelsBaseTest): 60, ) db.session.commit() - self.thread.update_metrics() + self.thread.update_compose_metrics() metrics = self.thread.raw_config_composes_count.collect() for sample in metrics[0].samples: if sample.name == "raw_config_composes_count_total": @@ -106,8 +102,8 @@ class TestQueueLengthThread(ModelsBaseTest): queue_declare = conn.default_channel.queue_declare queue_declare.return_value.message_count = 10 - thread = QueueLengthThread() - thread.update_metrics() + thread = MetricsCollectorThread() + thread.update_queue_metrics() metrics = thread.queue_length.collect() for metric in metrics: queues = set() @@ -131,8 +127,8 @@ class TestWorkerCountThread(ModelsBaseTest): ] # Both workers online. - thread = WorkerCountThread() - thread.update_metrics() + thread = MetricsCollectorThread() + thread.update_worker_metrics() metrics = thread.workers_total.collect() self.assertEqual(metrics[0].samples[0].value, 2) metrics = thread.workers.collect() @@ -141,7 +137,7 @@ class TestWorkerCountThread(ModelsBaseTest): self.assertEqual(sample.value, 1) # The worker-2 went offline. - thread.update_metrics() + thread.update_worker_metrics() metrics = thread.workers_total.collect() self.assertEqual(metrics[0].samples[0].value, 1) metrics = thread.workers.collect()