diff --git a/authentik/admin/tasks.py b/authentik/admin/tasks.py index e5672a132..2c7e91810 100644 --- a/authentik/admin/tasks.py +++ b/authentik/admin/tasks.py @@ -11,7 +11,12 @@ from structlog.stdlib import get_logger from authentik import ENV_GIT_HASH_KEY, __version__ from authentik.events.models import Event, EventAction, Notification -from authentik.events.monitored_tasks import PrefilledMonitoredTask, TaskResult, TaskResultStatus +from authentik.events.monitored_tasks import ( + MonitoredTask, + TaskResult, + TaskResultStatus, + prefill_task, +) from authentik.lib.config import CONFIG from authentik.lib.utils.http import get_http_session from authentik.root.celery import CELERY_APP @@ -48,8 +53,9 @@ def clear_update_notifications(): notification.delete() -@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) -def update_latest_version(self: PrefilledMonitoredTask): +@CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task +def update_latest_version(self: MonitoredTask): """Update latest version info""" if CONFIG.y_bool("disable_update_check"): cache.set(VERSION_CACHE_KEY, "0.0.0", VERSION_CACHE_TIMEOUT) diff --git a/authentik/core/tasks.py b/authentik/core/tasks.py index 79e250202..3e18c50b4 100644 --- a/authentik/core/tasks.py +++ b/authentik/core/tasks.py @@ -16,15 +16,21 @@ from kubernetes.config.incluster_config import SERVICE_HOST_ENV_NAME from structlog.stdlib import get_logger from authentik.core.models import AuthenticatedSession, ExpiringModel -from authentik.events.monitored_tasks import PrefilledMonitoredTask, TaskResult, TaskResultStatus +from authentik.events.monitored_tasks import ( + MonitoredTask, + TaskResult, + TaskResultStatus, + prefill_task, +) from authentik.lib.config import CONFIG from authentik.root.celery import CELERY_APP LOGGER = get_logger() -@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) -def clean_expired_models(self: PrefilledMonitoredTask): +@CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task +def clean_expired_models(self: MonitoredTask): """Remove expired objects""" messages = [] for cls in ExpiringModel.__subclasses__(): @@ -62,8 +68,9 @@ def should_backup() -> bool: return True -@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) -def backup_database(self: PrefilledMonitoredTask): # pragma: no cover +@CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task +def backup_database(self: MonitoredTask): # pragma: no cover """Database backup""" self.result_timeout_hours = 25 if not should_backup(): diff --git a/authentik/crypto/tasks.py b/authentik/crypto/tasks.py index 06d6b02ee..81e50219c 100644 --- a/authentik/crypto/tasks.py +++ b/authentik/crypto/tasks.py @@ -6,7 +6,12 @@ from django.utils.translation import gettext_lazy as _ from structlog.stdlib import get_logger from authentik.crypto.models import CertificateKeyPair -from authentik.events.monitored_tasks import PrefilledMonitoredTask, TaskResult, TaskResultStatus +from authentik.events.monitored_tasks import ( + MonitoredTask, + TaskResult, + TaskResultStatus, + prefill_task, +) from authentik.lib.config import CONFIG from authentik.root.celery import CELERY_APP @@ -15,8 +20,9 @@ LOGGER = get_logger() MANAGED_DISCOVERED = "goauthentik.io/crypto/discovered/%s" -@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) -def certificate_discovery(self: PrefilledMonitoredTask): +@CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task +def certificate_discovery(self: MonitoredTask): """Discover and update certificates form the filesystem""" certs = {} private_keys = {} diff --git a/authentik/events/monitored_tasks.py b/authentik/events/monitored_tasks.py index d7273e0bf..63330cddf 100644 --- a/authentik/events/monitored_tasks.py +++ b/authentik/events/monitored_tasks.py @@ -186,27 +186,21 @@ class MonitoredTask(Task): raise NotImplementedError -class PrefilledMonitoredTask(MonitoredTask): - """Subclass of MonitoredTask, but create entry in cache if task hasn't been run - Does not support UID""" - - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) - status = TaskInfo.by_name(self.__name__) - if status: - return - TaskInfo( - task_name=self.__name__, - task_description=self.__doc__, - result=TaskResult(TaskResultStatus.UNKNOWN, messages=[_("Task has not been run yet.")]), - task_call_module=self.__module__, - task_call_func=self.__name__, - # We don't have real values for these attributes but they cannot be null - start_timestamp=default_timer(), - finish_timestamp=default_timer(), - finish_time=datetime.now(), - ).save(86400) - LOGGER.debug("prefilled task", task_name=self.__name__) - - def run(self, *args, **kwargs): - raise NotImplementedError +def prefill_task(func): + """Ensure a task's details are always in cache, so it can always be triggered via API""" + status = TaskInfo.by_name(func.__name__) + if status: + return func + TaskInfo( + task_name=func.__name__, + task_description=func.__doc__, + result=TaskResult(TaskResultStatus.UNKNOWN, messages=[_("Task has not been run yet.")]), + task_call_module=func.__module__, + task_call_func=func.__name__, + # We don't have real values for these attributes but they cannot be null + start_timestamp=default_timer(), + finish_timestamp=default_timer(), + finish_time=datetime.now(), + ).save(86400) + LOGGER.debug("prefilled task", task_name=func.__name__) + return func diff --git a/authentik/managed/tasks.py b/authentik/managed/tasks.py index 118b9c370..2cc9b21d2 100644 --- a/authentik/managed/tasks.py +++ b/authentik/managed/tasks.py @@ -2,12 +2,18 @@ from django.db import DatabaseError from authentik.core.tasks import CELERY_APP -from authentik.events.monitored_tasks import PrefilledMonitoredTask, TaskResult, TaskResultStatus +from authentik.events.monitored_tasks import ( + MonitoredTask, + TaskResult, + TaskResultStatus, + prefill_task, +) from authentik.managed.manager import ObjectManager -@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) -def managed_reconcile(self: PrefilledMonitoredTask): +@CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task +def managed_reconcile(self: MonitoredTask): """Run ObjectManager to ensure objects are up-to-date""" try: ObjectManager().run() diff --git a/authentik/outposts/tasks.py b/authentik/outposts/tasks.py index 820f585f6..737b1ef15 100644 --- a/authentik/outposts/tasks.py +++ b/authentik/outposts/tasks.py @@ -19,9 +19,9 @@ from structlog.stdlib import get_logger from authentik.events.monitored_tasks import ( MonitoredTask, - PrefilledMonitoredTask, TaskResult, TaskResultStatus, + prefill_task, ) from authentik.lib.utils.reflection import path_to_class from authentik.outposts.controllers.base import BaseController, ControllerException @@ -75,8 +75,9 @@ def outpost_service_connection_state(connection_pk: Any): cache.set(connection.state_key, state, timeout=None) -@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) -def outpost_service_connection_monitor(self: PrefilledMonitoredTask): +@CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task +def outpost_service_connection_monitor(self: MonitoredTask): """Regularly check the state of Outpost Service Connections""" connections = OutpostServiceConnection.objects.all() for connection in connections.iterator(): @@ -124,8 +125,9 @@ def outpost_controller( self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, logs)) -@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) -def outpost_token_ensurer(self: PrefilledMonitoredTask): +@CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task +def outpost_token_ensurer(self: MonitoredTask): """Periodically ensure that all Outposts have valid Service Accounts and Tokens""" all_outposts = Outpost.objects.all() diff --git a/authentik/policies/reputation/tasks.py b/authentik/policies/reputation/tasks.py index 49b1590d1..3126dfca8 100644 --- a/authentik/policies/reputation/tasks.py +++ b/authentik/policies/reputation/tasks.py @@ -2,7 +2,12 @@ from django.core.cache import cache from structlog.stdlib import get_logger -from authentik.events.monitored_tasks import PrefilledMonitoredTask, TaskResult, TaskResultStatus +from authentik.events.monitored_tasks import ( + MonitoredTask, + TaskResult, + TaskResultStatus, + prefill_task, +) from authentik.policies.reputation.models import IPReputation, UserReputation from authentik.policies.reputation.signals import CACHE_KEY_IP_PREFIX, CACHE_KEY_USER_PREFIX from authentik.root.celery import CELERY_APP @@ -10,8 +15,9 @@ from authentik.root.celery import CELERY_APP LOGGER = get_logger() -@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) -def save_ip_reputation(self: PrefilledMonitoredTask): +@CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task +def save_ip_reputation(self: MonitoredTask): """Save currently cached reputation to database""" objects_to_update = [] for key, score in cache.get_many(cache.keys(CACHE_KEY_IP_PREFIX + "*")).items(): @@ -23,8 +29,9 @@ def save_ip_reputation(self: PrefilledMonitoredTask): self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, ["Successfully updated IP Reputation"])) -@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) -def save_user_reputation(self: PrefilledMonitoredTask): +@CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task +def save_user_reputation(self: MonitoredTask): """Save currently cached reputation to database""" objects_to_update = [] for key, score in cache.get_many(cache.keys(CACHE_KEY_USER_PREFIX + "*")).items(): diff --git a/authentik/sources/saml/tasks.py b/authentik/sources/saml/tasks.py index fb85b3088..cb72d55d0 100644 --- a/authentik/sources/saml/tasks.py +++ b/authentik/sources/saml/tasks.py @@ -3,7 +3,12 @@ from django.utils.timezone import now from structlog.stdlib import get_logger from authentik.core.models import AuthenticatedSession, User -from authentik.events.monitored_tasks import PrefilledMonitoredTask, TaskResult, TaskResultStatus +from authentik.events.monitored_tasks import ( + MonitoredTask, + TaskResult, + TaskResultStatus, + prefill_task, +) from authentik.lib.utils.time import timedelta_from_string from authentik.root.celery import CELERY_APP from authentik.sources.saml.models import SAMLSource @@ -11,8 +16,9 @@ from authentik.sources.saml.models import SAMLSource LOGGER = get_logger() -@CELERY_APP.task(bind=True, base=PrefilledMonitoredTask) -def clean_temporary_users(self: PrefilledMonitoredTask): +@CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task +def clean_temporary_users(self: MonitoredTask): """Remove temporary users created by SAML Sources""" _now = now() messages = []