Celery fail silently context manager

Celery ships with an configuration option called CELERYALWAYSEAGER which causes all tasks to be executed immediately instead of being asynchronously executed on workers. This can be very useful for unit tests. Instead of running a real message queue and separate worker processes, your unit tests can execute all in one process and still run the necessary tasks.

But in many cases, those tasks are not necessary for the unit tests to succeed. Say you have a task that fires when you create a user that sends a welcome email. You don't want the caller to wait while a worker composes a MIME message and contacts the SMTP server; that could take a little time. It's more of a fire and forget model. Actually, this is ALWAYS the case if you don't use the celery results backend; your callers have no way of receiving a result from the task, so it can't depend on it in a strict sense, unless it's doing some kind of database polling and waiting.

If this matches your usage pattern for celery, it may make you wish that you have a setting called CELERYALWAYSFAILSILENTLY, instead. To save time on your unit tests, you could tell celery to simply discard any calls to .delay() or `.applyasync()`. It would be just as if celery had accepted the task, but it hadn't got around to running just yet. Except that it would never run.

It turns out that you can implement this yourself by monkeypatching celery. Here is a context manager that does just that:

from contextlib import contextmanager
from celery import Celery

app = Celery('tasks', broker='amqp://guest:guest@localhost:5672//')

@app.task
def mytask():
    print 'Inside mytask'

@contextmanager
def celery_fail_silenty(*args):
    ''' short-circuit all tasks unless we are in eager mode '''

    from celery.app import current_app
    app = current_app()

    def send_task(self, *args, **kwargs):
        if app.conf.CELERY_ALWAYS_EAGER:
            return original_send_task(*args, **kwargs)

    original_send_task = app.send_task
    app.send_task = send_task
    try:
        yield 1
    finally:
        app.send_task = original_send_task


if __name__ == '__main__':
    #app.conf.update(CELERY_ALWAYS_EAGER=True)
    with celery_fail_silenty():
        mytask.delay()

As an added feature, it will execute the task in process if CELERYALWAYSENABLED is set. That way, you can use something like Django's override settings if you want just a small subset of your unit tests to actually execute their tasks.

I originally tried to mock apply_async directly, but that is a bound method per-task, and they are bound on import, so you can't easily change them all at runtime.



I'm currently working at NerdWallet, a startup in San Francisco trying to bring clarity to all of life's financial decisions. We're hiring like crazy. Hit me up on Twitter, I would love to talk.

Follow @chase_seibert on Twitter