From b9163f04c58f08a12a173b3faee3590f220a760e Mon Sep 17 00:00:00 2001 From: cfmack Date: Tue, 31 Dec 2024 15:46:49 -0600 Subject: [PATCH] Enable multi-node support --- ccwatch.yaml.sample | 10 ++++ celery_cloudwatch/__main__.py | 4 +- celery_cloudwatch/__version__.py | 2 +- celery_cloudwatch/cloudwatch_camera.py | 66 +++++++++++++++++------- celery_cloudwatch/state.py | 70 +++++++++++++++----------- celery_cloudwatch/task_monitor.py | 2 +- 6 files changed, 104 insertions(+), 50 deletions(-) diff --git a/ccwatch.yaml.sample b/ccwatch.yaml.sample index d783a6d..c32562f 100644 --- a/ccwatch.yaml.sample +++ b/ccwatch.yaml.sample @@ -2,12 +2,22 @@ ccwatch: broker: null camera: celery_cloudwatch.CloudWatchCamera verbose: no + task-prefix: myprefix camera: frequency: 60.0 verbose: no cloudwatch-camera: dryrun: no namespace: celery + metrics: + - CeleryEventSent + - CeleryEventStarted + - CeleryEventSucceeded + - CeleryEventFailed + - CeleryNumWaiting + - CeleryNumRunning + - CeleryWaitingTime + - CeleryProcessingTime tasks: - myapp.mytasks.taskname - myapp.mytasks.anothertask diff --git a/celery_cloudwatch/__main__.py b/celery_cloudwatch/__main__.py index 56d9805..ebb07e8 100644 --- a/celery_cloudwatch/__main__.py +++ b/celery_cloudwatch/__main__.py @@ -14,7 +14,8 @@ v.Optional('ccwatch', default={}): v.Schema({ v.Optional('broker', default=None): v.Any(None, v_str), v.Optional('camera', default="celery_cloudwatch.CloudWatchCamera"): v_str, - v.Optional('verbose', default=False): bool + v.Optional('verbose', default=False): bool, + v.Optional('task-prefix', default=None): v.Any(None, v_str), }, extra=False), v.Optional('camera', default={}): v.Schema({ v.Optional('frequency', default=60.0): v.Any(int, float), @@ -23,6 +24,7 @@ v.Optional('cloudwatch-camera', default={}): v.Schema({ v.Optional('dryrun', default=False): bool, v.Optional('namespace', default='celery'): v_str, + v.Optional('metrics', default=[]): [v_str], v.Optional('tasks', default=[]): v.Schema([ v_str, v.Schema({ 'name': v_str, diff --git a/celery_cloudwatch/__version__.py b/celery_cloudwatch/__version__.py index afced14..3f39079 100644 --- a/celery_cloudwatch/__version__.py +++ b/celery_cloudwatch/__version__.py @@ -1 +1 @@ -__version__ = '2.0.0' +__version__ = '2.0.1' diff --git a/celery_cloudwatch/cloudwatch_camera.py b/celery_cloudwatch/cloudwatch_camera.py index 88cbb39..13c7b7b 100644 --- a/celery_cloudwatch/cloudwatch_camera.py +++ b/celery_cloudwatch/cloudwatch_camera.py @@ -1,4 +1,4 @@ -import fnmatch +import re import itertools import json import logging @@ -23,6 +23,10 @@ def __init__(self, state, config, cloudwatch_client=None): self.verbose = config['camera']['verbose'] if not config['cloudwatch-camera']['dryrun'] and cloudwatch_client is None: cloudwatch_client = boto3.client('cloudwatch') + self.reported_metrics = [] + if 'metrics' in config['cloudwatch-camera']: + self.reported_metrics = config['cloudwatch-camera']['metrics'] + self.cloudwatch_client = cloudwatch_client self.cloud_watch_namespace = config['cloudwatch-camera']['namespace'] self.task_mapping = {} @@ -90,24 +94,32 @@ def _build_metrics(self, state): def _add_task_events(self, metrics, task_event_sent, task_event_started, task_event_succeeded, task_event_failed, num_waiting_by_task, num_running_by_task, time_to_start, time_to_process): for task_name, dimensions in self.task_mapping.items(): - metrics.add('CeleryEventSent', unit='Count', value=task_event_sent.get(task_name, 0), dimensions=dimensions) - metrics.add('CeleryEventStarted', unit='Count', value=task_event_started.get(task_name, 0), dimensions=dimensions) - metrics.add('CeleryEventSucceeded', unit='Count', value=task_event_succeeded.get(task_name, 0), dimensions=dimensions) - metrics.add('CeleryEventFailed', unit='Count', value=task_event_failed.get(task_name, 0), dimensions=dimensions) - metrics.add('CeleryNumWaiting', unit='Count', value=num_waiting_by_task.get(task_name, 0), dimensions=dimensions) - metrics.add('CeleryNumRunning', unit='Count', value=num_running_by_task.get(task_name, 0), dimensions=dimensions) + metrics = self.add_metrics(metrics=metrics, task_name='CeleryEventSent', unit='Count', + value=task_event_sent.get(task_name, 0), dimensions=dimensions) + metrics = self.add_metrics(metrics=metrics, task_name='CeleryEventStarted', unit='Count', + value=task_event_started.get(task_name, 0), dimensions=dimensions) + metrics = self.add_metrics(metrics=metrics, task_name='CeleryEventSucceeded', unit='Count', + value=task_event_succeeded.get(task_name, 0), dimensions=dimensions) + metrics = self.add_metrics(metrics=metrics, task_name='CeleryEventFailed', unit='Count', + value=task_event_failed.get(task_name, 0), dimensions=dimensions) + metrics = self.add_metrics(metrics=metrics, task_name='CeleryNumWaiting', unit='Count', + value=num_waiting_by_task.get(task_name, 0), dimensions=dimensions) + metrics = self.add_metrics(metrics=metrics, task_name='CeleryNumRunning', unit='Count', + value=num_running_by_task.get(task_name, 0), dimensions=dimensions) waiting_time = time_to_start.get(task_name) if waiting_time: - metrics.add('CeleryWaitingTime', unit='Seconds', dimensions=dimensions, stats=waiting_time.__dict__.copy()) + metrics = self.add_metrics(metrics=metrics, task_name='CeleryWaitingTime', unit='Seconds', + stats=waiting_time.__dict__.copy(), dimensions=dimensions) + running_time = time_to_process.get(task_name) if running_time: - metrics.add('CeleryProcessingTime', unit='Seconds', dimensions=dimensions, stats=running_time.__dict__.copy()) + metrics = self.add_metrics(metrics=metrics, task_name='CeleryProcessingTime', unit='Seconds', + stats=running_time.__dict__.copy(), dimensions=dimensions) def _add_task_groups(self, metrics, task_event_sent, task_event_started, task_event_succeeded, task_event_failed, num_waiting_by_task, num_running_by_task, time_to_start, time_to_process): all_task_names = set(itertools.chain(task_event_sent, task_event_started, task_event_succeeded, task_event_failed)) - for task_group in self.task_groups: dimensions = task_group['dimensions'] waiting = 0 @@ -124,7 +136,7 @@ def _add_task_groups(self, metrics, task_event_sent, task_event_started, task_ev task_names = [] for task_name in all_task_names: for pattern in patterns: - if fnmatch.fnmatchcase(pattern, task_name): + if re.fullmatch(pattern, task_name): task_names.append(task_name) break else: @@ -148,18 +160,34 @@ def _add_task_groups(self, metrics, task_event_sent, task_event_started, task_ev running_time = Stats() running_time += task_run_time - metrics.add('CeleryEventSent', unit='Count', value=waiting, dimensions=dimensions) - metrics.add('CeleryEventStarted', unit='Count', value=running, dimensions=dimensions) - metrics.add('CeleryEventSucceeded', unit='Count', value=completed, dimensions=dimensions) - metrics.add('CeleryEventFailed', unit='Count', value=failed, dimensions=dimensions) - metrics.add('CeleryNumWaiting', unit='Count', value=num_waiting, dimensions=dimensions) - metrics.add('CeleryNumRunning', unit='Count', value=num_running, dimensions=dimensions) + metrics = self.add_metrics(metrics=metrics, task_name='CeleryEventSent', unit='Count', value=waiting, + dimensions=dimensions) + metrics = self.add_metrics(metrics=metrics, task_name='CeleryEventStarted', unit='Count', value=running, + dimensions=dimensions) + metrics = self.add_metrics(metrics=metrics, task_name='CeleryEventSucceeded', unit='Count', value=completed, + dimensions=dimensions) + metrics = self.add_metrics(metrics=metrics, task_name='CeleryEventFailed', unit='Count', value=failed, + dimensions=dimensions) + metrics = self.add_metrics(metrics=metrics, task_name='CeleryNumWaiting', unit='Count', value=num_waiting, + dimensions=dimensions) + metrics = self.add_metrics(metrics=metrics, task_name='CeleryNumRunning', unit='Count', value=num_running, + dimensions=dimensions) if waiting_time: - metrics.add('CeleryWaitingTime', unit='Seconds', dimensions=dimensions, stats=waiting_time.__dict__.copy()) + metrics = self.add_metrics(metrics=metrics, task_name='CeleryWaitingTime', unit='Seconds', + stats=waiting_time.__dict__.copy(), dimensions=dimensions) if running_time: - metrics.add('CeleryProcessingTime', unit='Seconds', dimensions=dimensions, stats=running_time.__dict__.copy()) + metrics = self.add_metrics(metrics=metrics, task_name='CeleryProcessingTime', unit='Seconds', + stats=running_time.__dict__.copy(), dimensions=dimensions) + def add_metrics(self, metrics, task_name, unit, dimensions, value=None, stats=None): + if len(self.reported_metrics) == 0 or task_name in self.reported_metrics: + if stats is not None: + metrics.add(task_name, unit=unit, dimensions=dimensions, stats=stats) + else: + metrics.add(task_name, unit=unit, dimensions=dimensions, value=value) + return metrics + def xchunk(arr, size): for x in range(0, len(arr), size): yield arr[x:x+size] diff --git a/celery_cloudwatch/state.py b/celery_cloudwatch/state.py index f11b861..c35b2d5 100644 --- a/celery_cloudwatch/state.py +++ b/celery_cloudwatch/state.py @@ -12,7 +12,7 @@ class State(object): # http://docs.celeryproject.org/en/latest/userguide/monitoring.html - def __init__(self): + def __init__(self, config): self._mutex = threading.Lock() # track the number of events in the current window @@ -24,6 +24,7 @@ def __init__(self): self.time_to_start = defaultdict(Stats) self.time_to_process = defaultdict(Stats) + self.config = config self.registry = {} @@ -67,41 +68,51 @@ def num_waiting_running_by_task(self): def task_sent(self, event): with self._mutex: - uuid = event['uuid'] - if uuid not in self.registry: - task_name = event['name'] - self.registry[uuid] = TaskRecord(task_name, event['timestamp'], None, None, None) - self.task_event_sent[task_name] += 1 - return - - task_record = self.registry[uuid]._replace( - name=event['name'], - sent_at=event['timestamp'] - ) - self.registry[uuid] = task_record - self.task_event_sent[task_record.name] += 1 - - if task_record.started_at is None: - return + self._initiate_task(event) - self.task_event_started[task_record.name] += 1 - self.time_to_start[task_record.name] += task_record.wait_duration - if not task_record.finished: - return - del self.registry[uuid] - if task_record.successful: - self.task_event_succeeded[task_record.name] += 1 - self.time_to_process[task_record.name] += task_record.processing_duration + def _initiate_task(self, event): + uuid = event['uuid'] + if uuid not in self.registry: + if 'name' in event: + task_name = event['name'] else: - self.task_event_failed[task_record.name] += 1 + if 'task-prefix' in self.config['ccwatch'] and self.config['ccwatch']['task-prefix']: + task_name = "{}-{}".format(self.config['ccwatch']['task-prefix'], uuid) + else: + task_name = uuid + + self.registry[uuid] = TaskRecord(task_name, event['timestamp'], None, None, None) + self.task_event_sent[task_name] += 1 + return + + task_record = self.registry[uuid]._replace( + name=event['name'], + sent_at=event['timestamp'] + ) + self.registry[uuid] = task_record + self.task_event_sent[task_record.name] += 1 + + if task_record.started_at is None: + return + + self.task_event_started[task_record.name] += 1 + self.time_to_start[task_record.name] += task_record.wait_duration + if not task_record.finished: + return + del self.registry[uuid] + if task_record.successful: + self.task_event_succeeded[task_record.name] += 1 + self.time_to_process[task_record.name] += task_record.processing_duration + else: + self.task_event_failed[task_record.name] += 1 def task_started(self, event): with self._mutex: uuid = event['uuid'] task_record = self.registry.get(uuid, None) if task_record is None: - self.registry[uuid] = TaskRecord(None, None, event['timestamp'], None, None) - return + self._initiate_task(event) + task_record = self.registry.get(uuid, None) task_record = task_record._replace(started_at=event['timestamp']) self.registry[uuid] = task_record @@ -192,6 +203,9 @@ def __getnewargs__(self): 'Return self as a plain tuple. Used by copy and pickle.' return tuple(self) + def print(self): + print("TaskRecord: {} - {} - {} - {} - {}".format(self.name, self.sent_at, self.started_at, self.succeeded_at, self.failed_at)) + name = _property(_itemgetter(0), doc='Alias for field number 0') sent_at = _property(_itemgetter(1), doc='Alias for field number 1') diff --git a/celery_cloudwatch/task_monitor.py b/celery_cloudwatch/task_monitor.py index 7c83b0f..2e86e0d 100644 --- a/celery_cloudwatch/task_monitor.py +++ b/celery_cloudwatch/task_monitor.py @@ -21,7 +21,7 @@ def __init__(self, broker=None, camera='celery_cloudwatch.PrintCamera', def run(self): app = Celery(broker=self.broker) - state = State() + state = State(self.config) factory = CameraFactory(self.camera) camera = factory.camera(state, self.config)