Source code for hirefire.procs.celery

from __future__ import absolute_import
from collections import Counter
from itertools import chain
from logging import getLogger

from celery.app import app_or_default

try:
    from librabbitmq import ChannelError
except ImportError:
    try:
        from amqp.exceptions import ChannelError
    except ImportError:
        # No RabbitMQ API wrapper installed, different celery broker used
        ChannelError = Exception

from ..utils import KeyDefaultDict
from . import Proc


logger = getLogger('hirefire')


class CeleryInspector(KeyDefaultDict):
    """
    A defaultdict that manages the celery inspector cache.
    """

    def __init__(self, app, simple_queues=False):
        super(CeleryInspector, self).__init__(self.get_status_task_counts)
        self.app = app
        self.simple_queues = simple_queues
        self.route_queues = None

    @classmethod
    def simple_queues(cls, *args, **kwargs):
        return cls(*args, simple_queues=True, **kwargs)

    def get_route_queues(self):
        """Find the queue to each active routing pair.

        Cache to avoid additional calls to inspect().

        Returns a mapping from (exchange, routing_key) to queue_name.
        """
        if self.route_queues is not None:
            return self.route_queues

        worker_queues = self.inspect['active_queues']
        active_queues = chain.from_iterable(worker_queues.values())

        self.route_queues = {
            (queue['exchange']['name'], queue['routing_key']): queue['name']
            for queue in active_queues
        }
        return self.route_queues

    @property
    def inspect(self):
        """Proxy the inspector.

        Make it easy to get the return value from an inspect method.
        Use it like a dictionary, with the desired method as the key.
        """
        allowed_methods = ['active_queues', 'active', 'reserved', 'scheduled']
        inspect = self.app.control.inspect()

        def get_inspect_value(method):
            if method not in allowed_methods:
                raise KeyError('Method not allowed: {}'.format(method))
            return getattr(inspect, method)() or {}

        return KeyDefaultDict(get_inspect_value)

    def get_queue_fn(self, status):
        """Get a queue identifier function for the given status.

        scheduled tasks have a different layout from reserved and
        active tasks, so we need to look up the queue differently.
        Additionally, if the ``simple_queues`` flag is True, then
        we can shortcut the lookup process and avoid getting
        the route queues.
        """
        if not self.simple_queues:
            route_queues = self.get_route_queues()

        def identify_queue(delivery_info):
            exchange = delivery_info['exchange']
            routing_key = delivery_info['routing_key']
            if self.simple_queues:
                # If the exchange is '', use the routing_key instead
                return exchange or routing_key
            try:
                return route_queues[exchange, routing_key]
            except KeyError:
                msg = 'exchange, routing_key pair not found: {}'.format(
                    (exchange, routing_key))
                logger.warning(msg)
                return None  # Special queue name, not expected to be used

        def get_queue(task):
            if status == 'scheduled':
                return identify_queue(task['request']['delivery_info'])
            return identify_queue(task['delivery_info'])
        return get_queue

    def get_status_task_counts(self, status):
        """Get the tasks on all queues for the given status.

        This is called lazily to avoid running long methods when not needed.
        """
        if status not in ['active', 'reserved', 'scheduled']:
            raise KeyError('Invalid task status: {}'.format(status))

        tasks = chain.from_iterable(self.inspect[status].values())
        queues = map(self.get_queue_fn(status), tasks)

        if status == 'scheduled':
            queues = set(queues)  # Only count each queue once

        return Counter(queues)


[docs]class CeleryProc(Proc): """ A proc class for the `Celery <http://celeryproject.org>`_ library. :param name: the name of the proc (required) :param queues: list of queue names to check (required) :param app: the Celery app to check for the queues (optional) :type name: str :type queues: str or list :type app: :class:`~celery.Celery` Declarative example:: from celery import Celery from hirefire.procs.celery import CeleryProc celery = Celery('myproject', broker='amqp://guest@localhost//') class WorkerProc(CeleryProc): name = 'worker' queues = ['celery'] app = celery Or a simpler variant:: worker_proc = CeleryProc('worker', queues=['celery'], app=celery) In case you use one of the non-standard Celery clients (e.g. django-celery) you can leave the ``app`` attribute empty because Celery will automatically find the correct Celery app:: from hirefire.procs.celery import CeleryProc class WorkerProc(CeleryProc): name = 'worker' queues = ['celery'] Querying the tasks that are on the workers is a more expensive process, and if you're sure that you don't need them, then you can improve the response time by not looking for some statuses. The default statuses that are looked for are ``active``, ``reserved``, and ``scheduled``. You can configure to *not* look for those by overriding the ``inspect_statuses`` property. For example, this proc would not look at any tasks held by the workers. :: class WorkerProc(CeleryProc): name = 'worker' queues = ['celery'] inspect_statuses = [] ``scheduled`` tasks are tasks that have been triggered with an ``eta``, the most common example of which is using ``retry`` on tasks. If you're sure you aren't using these tasks, you can skip querying for these tasks. ``reserved`` tasks are tasks that have been taken from the queue by the main process (coordinator) on the worker dyno, but have not yet been given to a worker run. If you've configured Celery to only fetch the tasks that it is currently running, then you may be able to skip querying for these tasks. See http://docs.celeryproject.org/en/latest/userguide/optimizing.html#prefetch-limits form more information. ``active`` tasks are currently running tasks. If your tasks are short-lived enough, then you may not need to look for these tasks. If you choose to not look at active tasks, look out for ``WorkerLostError`` exceptions. See https://github.com/celery/celery/issues/2839 for more information. If you have a particular simple case, you can use a shortcut to eliminate one inspect call when inspecting statuses. The ``active_queues`` inspect call is needed to map ``exchange`` and ``routing_key`` back to the celery ``queue`` that it is for. If all of your ``queue``, ``exchange``, and ``routing_key`` are the same (which is the default in Celery), then you can use the ``simple_queues = True`` flag to note that all the queues in the proc use the same name for their ``exchange`` and ``routing_key``. This defaults to ``False`` for backward compatibility, but if your queues are using this simple setup, you're encouraged to use it like so: :: class WorkerProc(CeleryProc): name = 'worker' queues = ['celery'] simple_queues = True Because of how this is implemented, you will almost certainly wish to use this feature on all of your procs, or on none of them. This is because both variants have separate caches that make separate calls to the inspect methods, so having both kinds present will mean that the inspect calls will be run twice. """ #: The name of the proc (required). name = None #: The list of queues to check (required). queues = ['celery'] #: The Celery app to check for the queues (optional). app = None #: The Celery task status to check for on workers (optional). #: Valid options are 'active', 'reserved', and 'scheduled'. inspect_statuses = ['active', 'reserved', 'scheduled'] #: Whether or not the exchange and routing_key are the same #: as the queue name for the queues in this proc. #: Default: False. simple_queues = False def __init__(self, app=None, *args, **kwargs): super(CeleryProc, self).__init__(*args, **kwargs) if app is not None: self.app = app self.app = app_or_default(self.app) @staticmethod def _get_redis_task_count(channel, queue): return channel.client.llen(queue) @staticmethod def _get_rabbitmq_task_count(channel, queue): try: return channel.queue_declare(queue=queue, passive=True).message_count except ChannelError: logger.warning("The requested queue %s has not been created yet", queue) return 0
[docs] def quantity(self, cache=None, **kwargs): """ Returns the aggregated number of tasks of the proc queues. """ with self.app.connection_or_acquire() as connection: channel = connection.channel() # Redis if hasattr(channel, '_size'): return sum(self._get_redis_task_count(channel, queue) for queue in self.queues) # RabbitMQ count = sum(self._get_rabbitmq_task_count(channel, queue) for queue in self.queues) if cache is not None and self.inspect_statuses: count += self.inspect_count(cache) return count
[docs] def inspect_count(self, cache): """Use Celery's inspect() methods to see tasks on workers.""" cache.setdefault('celery_inspect', { True: KeyDefaultDict(CeleryInspector.simple_queues), False: KeyDefaultDict(CeleryInspector), }) celery_inspect = cache['celery_inspect'][self.simple_queues][self.app] return sum( celery_inspect[status][queue] for status in self.inspect_statuses for queue in self.queues )