From 8fedd9ec0729733c74ed95cdc3c92e7c4ddea784 Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Fri, 16 Oct 2020 14:31:01 +0200 Subject: [PATCH] stages/email: Implement MonitoredTask, but only for failed emails --- passbook/lib/tasks.py | 25 ++++++++++-------- passbook/stages/email/tasks.py | 47 ++++++++++++++++++++++++---------- 2 files changed, 48 insertions(+), 24 deletions(-) diff --git a/passbook/lib/tasks.py b/passbook/lib/tasks.py index 0960d24e9..0e837c559 100644 --- a/passbook/lib/tasks.py +++ b/passbook/lib/tasks.py @@ -69,10 +69,14 @@ class TaskInfo: class MonitoredTask(Task): """Task which can save its state to the cache""" + # For tasks that should only be listed if they failed, set this to False + save_on_success: bool + _result: TaskResult def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) + self.save_on_success = True self._result = TaskResult(status=TaskResultStatus.ERROR, messages=[]) def set_status(self, result: TaskResult): @@ -83,16 +87,17 @@ class MonitoredTask(Task): def after_return( self, status, retval, task_id, args: List[Any], kwargs: Dict[str, Any], einfo ): - TaskInfo( - task_name=self.__name__, - task_description=self.__doc__, - finish_timestamp=datetime.now(), - result=self._result, - task_call_module=self.__module__, - task_call_func=self.__name__, - task_call_args=args, - task_call_kwargs=kwargs, - ).save() + if self.save_on_success: + TaskInfo( + task_name=self.__name__, + task_description=self.__doc__, + finish_timestamp=datetime.now(), + result=self._result, + task_call_module=self.__module__, + task_call_func=self.__name__, + task_call_args=args, + task_call_kwargs=kwargs, + ).save() return super().after_return(status, retval, task_id, args, kwargs, einfo=einfo) # pylint: disable=too-many-arguments diff --git a/passbook/stages/email/tasks.py b/passbook/stages/email/tasks.py index d2a10b482..89a2ce813 100644 --- a/passbook/stages/email/tasks.py +++ b/passbook/stages/email/tasks.py @@ -1,11 +1,14 @@ """email stage tasks""" +from email.utils import make_msgid from smtplib import SMTPException from typing import Any, Dict, List from celery import group from django.core.mail import EmailMultiAlternatives +from django.core.mail.utils import DNS_NAME from structlog import get_logger +from passbook.lib.tasks import MonitoredTask, TaskResult, TaskResultStatus from passbook.root.celery import CELERY_APP from passbook.stages.email.models import EmailStage @@ -16,7 +19,7 @@ def send_mails(stage: EmailStage, *messages: List[EmailMultiAlternatives]): """Wrapper to convert EmailMessage to dict and send it from worker""" tasks = [] for message in messages: - tasks.append(_send_mail_task.s(stage.pk, message.__dict__)) + tasks.append(send_mail.s(stage.pk, message.__dict__)) lazy_group = group(*tasks) promise = lazy_group() return promise @@ -29,19 +32,35 @@ def send_mails(stage: EmailStage, *messages: List[EmailMultiAlternatives]): ConnectionError, ), retry_backoff=True, + base=MonitoredTask, ) -# pylint: disable=unused-argument -def _send_mail_task(self, email_stage_pk: int, message: Dict[Any, Any]): +def send_mail(self: MonitoredTask, email_stage_pk: int, message: Dict[Any, Any]): """Send Email according to EmailStage parameters from background worker. Automatically retries if message couldn't be sent.""" - stage: EmailStage = EmailStage.objects.get(pk=email_stage_pk) - backend = stage.backend - backend.open() - # Since django's EmailMessage objects are not JSON serialisable, - # we need to rebuild them from a dict - message_object = EmailMultiAlternatives() - for key, value in message.items(): - setattr(message_object, key, value) - message_object.from_email = stage.from_address - LOGGER.debug("Sending mail", to=message_object.to) - stage.backend.send_messages([message_object]) + self.save_on_success = False + try: + stage: EmailStage = EmailStage.objects.get(pk=email_stage_pk) + backend = stage.backend + backend.open() + # Since django's EmailMessage objects are not JSON serialisable, + # we need to rebuild them from a dict + message_object = EmailMultiAlternatives() + for key, value in message.items(): + setattr(message_object, key, value) + message_object.from_email = stage.from_address + # Because we use the Message-ID as UID for the task, manually assign it + message_id = make_msgid(DNS_NAME) + message_object.extra_headers["Message-ID"] = message_id + + LOGGER.debug("Sending mail", to=message_object.to) + stage.backend.send_messages([message_object]) + self.set_status( + TaskResult( + TaskResultStatus.SUCCESSFUL, + messages=["Successfully sent Mail."], + uid=message_id, + ) + ) + except (SMTPException, ConnectionError) as exc: + self.set_status(TaskResult(TaskResultStatus.ERROR, [str(exc)], exc)) + raise exc