django-celery: blacklist errors by class
Celery has a built-in mechanism for sending untrapped errors to admins via email. This is great for trouble-shooting. Sending the emails directly to developers is also a good way to make sure they actually get fixed. I know 1,000 emails in my inbox motivates me!
However, you may not want to receive all errors. For example, our celery instance routinely throws TimeLimitExceeded exceptions that can be safely ignored, at least on some tasks. TimeLimitExceeded is tough, too, because you cannot catch it; it's handled by the celery framework.
While django-celery provides a mechanism to supply a exception whitelist, that may not work for you. If you're like me, you don't know what errors you want to receive emails for; by definition you do not have the foresight to trap them ahead of time! In that case, what you really need is an exception BLACKLIST. While I have opened a ticket for that, in the meantime you can use the following decorator around your task as a solution.
import sys import traceback from django.core.mail import mail_admins from django.utils.text import truncate_words from django.template.defaultfilters import slugify from celery.exceptions import SoftTimeLimitExceeded from celery import task as _task class email_errors(object): """ Wraps a celery task in a try/catch with custom error types, only sending admin alert emails on non-blacklisted exceptions. Can be attached as a decorator to a task, such as : @email_errors(error_blacklist=[MaybeEncodingError, ]) @task def some_task(): pass """ default_error_blacklist = [SoftTimeLimitExceeded, ] def __init__(self, error_blacklist=[]): self.error_blacklist = error_blacklist self.error_blacklist.extend(self.default_error_blacklist) def __call__(self, task): task.unsynchronized_run = task.run @wraps(task.unsynchronized_run) def wrapper(*args, **kwargs): try: task.unsynchronized_run(*args, **kwargs) except Exception, e: if type(e) in self.error_blacklist: stacktrace = get_stacktrace(e) print "ERROR: task=%s\n%s" % (task.__name__, stacktrace) else: mail_exception(e, prefix="[celery]") task.run = wrapper return task def get_stacktrace(error): return "".join(traceback.format_exception(type(error), error, sys.exc_traceback)) def mail_exception(error, prefix=None): """ Mails an exception w/ stacktrace to ADMINS, can pass a prefix like '[celery]' """ try: stacktrace = get_stacktrace(error) print stacktrace for subject in _error_subjects(error): try: if prefix: subject = "%s %s" % (prefix, subject) mail_admins(subject, "%s\n%s" % (error, stacktrace)) break except: pass except Exception, e: print e def _error_subjects(error): """ returns a list of potential subjects for an error email, some may fail """ error_str = str(error) return [truncate_words(error_str, 5), slugify(truncate_words(error_str, 5)), "mail_exception error"]
In this scenario, you are sending errors emails yourself. You will want to disable CELERY_SEND_TASK_ERROR_EMAILS. By disabling that setting, you will also not get the TimeLimitExceeded emails.