django-celery: blacklist errors by class

Celery has a built-in mechanism for sending untrapped errors to admins via email. This is great for trouble-shooting. Sending the emails directly to developers is also a good way to make sure they actually get fixed. I know 1,000 emails in my inbox motivates me!

However, you may not want to receive all errors. For example, our celery instance routinely throws TimeLimitExceeded exceptions that can be safely ignored, at least on some tasks. TimeLimitExceeded is tough, too, because you cannot catch it; it's handled by the celery framework.

While django-celery provides a mechanism to supply a exception whitelist, that may not work for you. If you're like me, you don't know what errors you want to receive emails for; by definition you do not have the foresight to trap them ahead of time! In that case, what you really need is an exception BLACKLIST. While I have opened a ticket for that, in the meantime you can use the following decorator around your task as a solution.

import sys
import traceback
from django.core.mail import mail_admins
from django.utils.text import truncate_words
from django.template.defaultfilters import slugify
from celery.exceptions import SoftTimeLimitExceeded
from celery import task as _task

class email_errors(object):

    """ Wraps a celery task in a try/catch with custom error types, only
        sending admin alert emails on non-blacklisted exceptions.

        Can be attached as a decorator to a task, such as :

        @email_errors(error_blacklist=[MaybeEncodingError, ])
        @task
        def some_task():
            pass
    """

    default_error_blacklist = [SoftTimeLimitExceeded, ]

    def __init__(self, error_blacklist=[]):
        self.error_blacklist = error_blacklist
        self.error_blacklist.extend(self.default_error_blacklist)

    def __call__(self, task):
        task.unsynchronized_run = task.run
        @wraps(task.unsynchronized_run)
        def wrapper(*args, **kwargs):
        try:
            task.unsynchronized_run(*args, **kwargs)
        except Exception, e:
            if type(e) in self.error_blacklist:
                stacktrace = get_stacktrace(e)
                print "ERROR: task=%s\n%s" % (task.__name__, stacktrace)
            else:
                mail_exception(e, prefix="[celery]")
        task.run = wrapper
        return task

def get_stacktrace(error):
    return "".join(traceback.format_exception(type(error), error, sys.exc_traceback))


def mail_exception(error, prefix=None):
    """ Mails an exception w/ stacktrace to ADMINS, can pass a prefix like '[celery]' """
    try:
        stacktrace = get_stacktrace(error)
        print stacktrace
        for subject in _error_subjects(error):
            try:
                if prefix:
                    subject = "%s %s" % (prefix, subject)
                mail_admins(subject, "%s\n%s" % (error, stacktrace))
                break
            except:
                pass
    except Exception, e:
        print e

def _error_subjects(error):
    """ returns a list of potential subjects for an error email, some may fail """
    error_str = str(error)
    return [truncate_words(error_str, 5), slugify(truncate_words(error_str, 5)), "mail_exception error"]

In this scenario, you are sending errors emails yourself. You will want to disable CELERY_SEND_TASK_ERROR_EMAILS. By disabling that setting, you will also not get the TimeLimitExceeded emails.