diff --git a/authentik/admin/tasks.py b/authentik/admin/tasks.py index 8db757d1f..6a75f51c9 100644 --- a/authentik/admin/tasks.py +++ b/authentik/admin/tasks.py @@ -11,7 +11,7 @@ from structlog.stdlib import get_logger from authentik import __version__, get_build_hash from authentik.admin.apps import PROM_INFO from authentik.events.models import Event, EventAction, Notification -from authentik.events.monitored_tasks import MonitoredTask, TaskStatus, prefill_task +from authentik.events.system_tasks import SystemTask, TaskStatus, prefill_task from authentik.lib.config import CONFIG from authentik.lib.utils.http import get_http_session from authentik.root.celery import CELERY_APP @@ -49,9 +49,9 @@ def clear_update_notifications(): notification.delete() -@CELERY_APP.task(bind=True, base=MonitoredTask) +@CELERY_APP.task(bind=True, base=SystemTask) @prefill_task -def update_latest_version(self: MonitoredTask): +def update_latest_version(self: SystemTask): """Update latest version info""" if CONFIG.get_bool("disable_update_check"): cache.set(VERSION_CACHE_KEY, "0.0.0", VERSION_CACHE_TIMEOUT) diff --git a/authentik/blueprints/v1/tasks.py b/authentik/blueprints/v1/tasks.py index 33b517cb0..c23109ff7 100644 --- a/authentik/blueprints/v1/tasks.py +++ b/authentik/blueprints/v1/tasks.py @@ -30,7 +30,7 @@ from authentik.blueprints.v1.importer import Importer from authentik.blueprints.v1.labels import LABEL_AUTHENTIK_INSTANTIATE from authentik.blueprints.v1.oci import OCI_PREFIX from authentik.events.models import TaskStatus -from authentik.events.monitored_tasks import MonitoredTask, prefill_task +from authentik.events.system_tasks import SystemTask, prefill_task from authentik.events.utils import sanitize_dict from authentik.lib.config import CONFIG from authentik.root.celery import CELERY_APP @@ -124,10 +124,10 @@ def blueprints_find() -> list[BlueprintFile]: @CELERY_APP.task( - throws=(DatabaseError, ProgrammingError, InternalError), base=MonitoredTask, bind=True + throws=(DatabaseError, ProgrammingError, InternalError), base=SystemTask, bind=True ) @prefill_task -def blueprints_discovery(self: MonitoredTask, path: Optional[str] = None): +def blueprints_discovery(self: SystemTask, path: Optional[str] = None): """Find blueprints and check if they need to be created in the database""" count = 0 for blueprint in blueprints_find(): @@ -169,9 +169,9 @@ def check_blueprint_v1_file(blueprint: BlueprintFile): @CELERY_APP.task( bind=True, - base=MonitoredTask, + base=SystemTask, ) -def apply_blueprint(self: MonitoredTask, instance_pk: str): +def apply_blueprint(self: SystemTask, instance_pk: str): """Apply single blueprint""" self.save_on_success = False instance: Optional[BlueprintInstance] = None diff --git a/authentik/core/tasks.py b/authentik/core/tasks.py index 788cbc25e..c3cd5df37 100644 --- a/authentik/core/tasks.py +++ b/authentik/core/tasks.py @@ -13,15 +13,15 @@ from authentik.core.models import ( ExpiringModel, User, ) -from authentik.events.monitored_tasks import MonitoredTask, TaskStatus, prefill_task +from authentik.events.system_tasks import SystemTask, TaskStatus, prefill_task from authentik.root.celery import CELERY_APP LOGGER = get_logger() -@CELERY_APP.task(bind=True, base=MonitoredTask) +@CELERY_APP.task(bind=True, base=SystemTask) @prefill_task -def clean_expired_models(self: MonitoredTask): +def clean_expired_models(self: SystemTask): """Remove expired objects""" messages = [] for cls in ExpiringModel.__subclasses__(): @@ -52,9 +52,9 @@ def clean_expired_models(self: MonitoredTask): self.set_status(TaskStatus.SUCCESSFUL, *messages) -@CELERY_APP.task(bind=True, base=MonitoredTask) +@CELERY_APP.task(bind=True, base=SystemTask) @prefill_task -def clean_temporary_users(self: MonitoredTask): +def clean_temporary_users(self: SystemTask): """Remove temporary users created by SAML Sources""" _now = datetime.now() messages = [] diff --git a/authentik/crypto/tasks.py b/authentik/crypto/tasks.py index 02fda31a6..066fee8ac 100644 --- a/authentik/crypto/tasks.py +++ b/authentik/crypto/tasks.py @@ -10,7 +10,7 @@ from structlog.stdlib import get_logger from authentik.crypto.models import CertificateKeyPair from authentik.events.models import TaskStatus -from authentik.events.monitored_tasks import MonitoredTask, prefill_task +from authentik.events.system_tasks import SystemTask, prefill_task from authentik.lib.config import CONFIG from authentik.root.celery import CELERY_APP @@ -35,9 +35,9 @@ def ensure_certificate_valid(body: str): return body -@CELERY_APP.task(bind=True, base=MonitoredTask) +@CELERY_APP.task(bind=True, base=SystemTask) @prefill_task -def certificate_discovery(self: MonitoredTask): +def certificate_discovery(self: SystemTask): """Discover, import and update certificates from the filesystem""" certs = {} private_keys = {} diff --git a/authentik/events/apps.py b/authentik/events/apps.py index 95cc65ab5..ed911effd 100644 --- a/authentik/events/apps.py +++ b/authentik/events/apps.py @@ -58,7 +58,7 @@ class AuthentikEventsConfig(ManagedAppConfig): def reconcile_prefill_tasks(self): """Prefill tasks""" from authentik.events.models import SystemTask - from authentik.events.monitored_tasks import _prefill_tasks + from authentik.events.system_tasks import _prefill_tasks for task in _prefill_tasks: if SystemTask.objects.filter(name=task.name).exists(): diff --git a/authentik/events/monitored_tasks.py b/authentik/events/system_tasks.py similarity index 92% rename from authentik/events/monitored_tasks.py rename to authentik/events/system_tasks.py index 383bc376c..77a0f1835 100644 --- a/authentik/events/monitored_tasks.py +++ b/authentik/events/system_tasks.py @@ -8,14 +8,16 @@ from django.utils.timezone import now from django.utils.translation import gettext_lazy as _ from structlog.stdlib import get_logger -from authentik.events.models import Event, EventAction, SystemTask, TaskStatus +from authentik.events.models import Event, EventAction +from authentik.events.models import SystemTask as DBSystemTask +from authentik.events.models import TaskStatus from authentik.events.utils import sanitize_item from authentik.lib.utils.errors import exception_to_string LOGGER = get_logger() -class MonitoredTask(Task): +class SystemTask(Task): """Task which can save its state to the cache""" # For tasks that should only be listed if they failed, set this to False @@ -59,12 +61,12 @@ class MonitoredTask(Task): if not self._status: return if self._status == TaskStatus.SUCCESSFUL and not self.save_on_success: - SystemTask.objects.filter( + DBSystemTask.objects.filter( name=self.__name__, uid=self._uid, ).delete() return - SystemTask.objects.update_or_create( + DBSystemTask.objects.update_or_create( name=self.__name__, uid=self._uid, defaults={ @@ -88,7 +90,7 @@ class MonitoredTask(Task): if not self._status: self._status = TaskStatus.ERROR self._messages = exception_to_string(exc) - SystemTask.objects.update_or_create( + DBSystemTask.objects.update_or_create( name=self.__name__, uid=self._uid, defaults={ @@ -117,7 +119,7 @@ class MonitoredTask(Task): def prefill_task(func): """Ensure a task's details are always in cache, so it can always be triggered via API""" _prefill_tasks.append( - SystemTask( + DBSystemTask( name=func.__name__, description=func.__doc__, status=TaskStatus.UNKNOWN, diff --git a/authentik/events/tasks.py b/authentik/events/tasks.py index cdcdf9ca1..c89623808 100644 --- a/authentik/events/tasks.py +++ b/authentik/events/tasks.py @@ -15,7 +15,7 @@ from authentik.events.models import ( NotificationTransportError, TaskStatus, ) -from authentik.events.monitored_tasks import MonitoredTask, prefill_task +from authentik.events.system_tasks import SystemTask, prefill_task from authentik.policies.engine import PolicyEngine from authentik.policies.models import PolicyBinding, PolicyEngineMode from authentik.root.celery import CELERY_APP @@ -95,10 +95,10 @@ def event_trigger_handler(event_uuid: str, trigger_name: str): bind=True, autoretry_for=(NotificationTransportError,), retry_backoff=True, - base=MonitoredTask, + base=SystemTask, ) def notification_transport( - self: MonitoredTask, transport_pk: int, event_pk: str, user_pk: int, trigger_pk: str + self: SystemTask, transport_pk: int, event_pk: str, user_pk: int, trigger_pk: str ): """Send notification over specified transport""" self.save_on_success = False @@ -133,9 +133,9 @@ def gdpr_cleanup(user_pk: int): events.delete() -@CELERY_APP.task(bind=True, base=MonitoredTask) +@CELERY_APP.task(bind=True, base=SystemTask) @prefill_task -def notification_cleanup(self: MonitoredTask): +def notification_cleanup(self: SystemTask): """Cleanup seen notifications and notifications whose event expired.""" notifications = Notification.objects.filter(Q(event=None) | Q(seen=True)) amount = notifications.count() diff --git a/authentik/events/tests/test_tasks.py b/authentik/events/tests/test_tasks.py index 8ba898753..0e3a3cd5d 100644 --- a/authentik/events/tests/test_tasks.py +++ b/authentik/events/tests/test_tasks.py @@ -6,8 +6,9 @@ from rest_framework.test import APITestCase from authentik.core.tasks import clean_expired_models from authentik.core.tests.utils import create_test_admin_user -from authentik.events.models import SystemTask, TaskStatus -from authentik.events.monitored_tasks import MonitoredTask +from authentik.events.models import SystemTask as DBSystemTask +from authentik.events.models import TaskStatus +from authentik.events.system_tasks import SystemTask from authentik.lib.generators import generate_id from authentik.root.celery import CELERY_APP @@ -28,9 +29,9 @@ class TestSystemTasks(APITestCase): @CELERY_APP.task( bind=True, - base=MonitoredTask, + base=SystemTask, ) - def test_task(self: MonitoredTask): + def test_task(self: SystemTask): self.save_on_success = False self.set_uid(uid) self.set_status(TaskStatus.ERROR if should_fail else TaskStatus.SUCCESSFUL) @@ -38,18 +39,18 @@ class TestSystemTasks(APITestCase): # First test successful run should_fail = False test_task.delay().get() - self.assertIsNone(SystemTask.objects.filter(name="test_task", uid=uid).first()) + self.assertIsNone(DBSystemTask.objects.filter(name="test_task", uid=uid).first()) # Then test failed should_fail = True test_task.delay().get() - task = SystemTask.objects.filter(name="test_task", uid=uid).first() + task = DBSystemTask.objects.filter(name="test_task", uid=uid).first() self.assertEqual(task.status, TaskStatus.ERROR) # Then after that, the state should be removed should_fail = False test_task.delay().get() - self.assertIsNone(SystemTask.objects.filter(name="test_task", uid=uid).first()) + self.assertIsNone(DBSystemTask.objects.filter(name="test_task", uid=uid).first()) def test_tasks(self): """Test Task API""" @@ -62,7 +63,7 @@ class TestSystemTasks(APITestCase): def test_tasks_single(self): """Test Task API (read single)""" clean_expired_models.delay().get() - task = SystemTask.objects.filter(name="clean_expired_models").first() + task = DBSystemTask.objects.filter(name="clean_expired_models").first() response = self.client.get( reverse( "authentik_api:systemtask-detail", @@ -81,7 +82,7 @@ class TestSystemTasks(APITestCase): def test_tasks_run(self): """Test Task API (run)""" clean_expired_models.delay().get() - task = SystemTask.objects.filter(name="clean_expired_models").first() + task = DBSystemTask.objects.filter(name="clean_expired_models").first() response = self.client.post( reverse( "authentik_api:systemtask-run", diff --git a/authentik/outposts/tasks.py b/authentik/outposts/tasks.py index 93121b90e..72309a8e3 100644 --- a/authentik/outposts/tasks.py +++ b/authentik/outposts/tasks.py @@ -20,7 +20,7 @@ from yaml import safe_load from authentik.enterprise.providers.rac.controllers.docker import RACDockerController from authentik.enterprise.providers.rac.controllers.kubernetes import RACKubernetesController from authentik.events.models import TaskStatus -from authentik.events.monitored_tasks import MonitoredTask, prefill_task +from authentik.events.system_tasks import SystemTask, prefill_task from authentik.lib.config import CONFIG from authentik.lib.utils.reflection import path_to_class from authentik.outposts.consumer import OUTPOST_GROUP @@ -104,11 +104,11 @@ def outpost_service_connection_state(connection_pk: Any): @CELERY_APP.task( bind=True, - base=MonitoredTask, + base=SystemTask, throws=(DatabaseError, ProgrammingError, InternalError), ) @prefill_task -def outpost_service_connection_monitor(self: MonitoredTask): +def outpost_service_connection_monitor(self: SystemTask): """Regularly check the state of Outpost Service Connections""" connections = OutpostServiceConnection.objects.all() for connection in connections.iterator(): @@ -128,9 +128,9 @@ def outpost_controller_all(): outpost_controller.delay(outpost.pk.hex, "up", from_cache=False) -@CELERY_APP.task(bind=True, base=MonitoredTask) +@CELERY_APP.task(bind=True, base=SystemTask) def outpost_controller( - self: MonitoredTask, outpost_pk: str, action: str = "up", from_cache: bool = False + self: SystemTask, outpost_pk: str, action: str = "up", from_cache: bool = False ): """Create/update/monitor/delete the deployment of an Outpost""" logs = [] @@ -162,9 +162,9 @@ def outpost_controller( self.set_status(TaskStatus.SUCCESSFUL, *logs) -@CELERY_APP.task(bind=True, base=MonitoredTask) +@CELERY_APP.task(bind=True, base=SystemTask) @prefill_task -def outpost_token_ensurer(self: MonitoredTask): +def outpost_token_ensurer(self: SystemTask): """Periodically ensure that all Outposts have valid Service Accounts and Tokens""" all_outposts = Outpost.objects.all() @@ -248,10 +248,10 @@ def _outpost_single_update(outpost: Outpost, layer=None): @CELERY_APP.task( - base=MonitoredTask, + base=SystemTask, bind=True, ) -def outpost_connection_discovery(self: MonitoredTask): +def outpost_connection_discovery(self: SystemTask): """Checks the local environment and create Service connections.""" messages = [] if not CONFIG.get_bool("outposts.discover"): diff --git a/authentik/policies/reputation/tasks.py b/authentik/policies/reputation/tasks.py index 0f9532bc6..94d080d7c 100644 --- a/authentik/policies/reputation/tasks.py +++ b/authentik/policies/reputation/tasks.py @@ -5,7 +5,7 @@ from structlog.stdlib import get_logger from authentik.events.context_processors.asn import ASN_CONTEXT_PROCESSOR from authentik.events.context_processors.geoip import GEOIP_CONTEXT_PROCESSOR from authentik.events.models import TaskStatus -from authentik.events.monitored_tasks import MonitoredTask, prefill_task +from authentik.events.system_tasks import SystemTask, prefill_task from authentik.policies.reputation.models import Reputation from authentik.policies.reputation.signals import CACHE_KEY_PREFIX from authentik.root.celery import CELERY_APP @@ -13,9 +13,9 @@ from authentik.root.celery import CELERY_APP LOGGER = get_logger() -@CELERY_APP.task(bind=True, base=MonitoredTask) +@CELERY_APP.task(bind=True, base=SystemTask) @prefill_task -def save_reputation(self: MonitoredTask): +def save_reputation(self: SystemTask): """Save currently cached reputation to database""" objects_to_update = [] for _, score in cache.get_many(cache.keys(CACHE_KEY_PREFIX + "*")).items(): diff --git a/authentik/providers/scim/tasks.py b/authentik/providers/scim/tasks.py index e88e2c814..5b529dede 100644 --- a/authentik/providers/scim/tasks.py +++ b/authentik/providers/scim/tasks.py @@ -11,7 +11,7 @@ from structlog.stdlib import get_logger from authentik.core.models import Group, User from authentik.events.models import TaskStatus -from authentik.events.monitored_tasks import MonitoredTask +from authentik.events.system_tasks import SystemTask from authentik.lib.utils.reflection import path_to_class from authentik.providers.scim.clients import PAGE_SIZE, PAGE_TIMEOUT from authentik.providers.scim.clients.base import SCIMClient @@ -40,8 +40,8 @@ def scim_sync_all(): scim_sync.delay(provider.pk) -@CELERY_APP.task(bind=True, base=MonitoredTask) -def scim_sync(self: MonitoredTask, provider_pk: int) -> None: +@CELERY_APP.task(bind=True, base=SystemTask) +def scim_sync(self: SystemTask, provider_pk: int) -> None: """Run SCIM full sync for provider""" provider: SCIMProvider = SCIMProvider.objects.filter( pk=provider_pk, backchannel_application__isnull=False diff --git a/authentik/sources/ldap/tasks.py b/authentik/sources/ldap/tasks.py index 6931affa2..0b5dd5ab2 100644 --- a/authentik/sources/ldap/tasks.py +++ b/authentik/sources/ldap/tasks.py @@ -8,8 +8,9 @@ from ldap3.core.exceptions import LDAPException from redis.exceptions import LockError from structlog.stdlib import get_logger -from authentik.events.models import SystemTask, TaskStatus -from authentik.events.monitored_tasks import MonitoredTask +from authentik.events.models import SystemTask as DBSystemTask +from authentik.events.models import TaskStatus +from authentik.events.system_tasks import SystemTask from authentik.lib.config import CONFIG from authentik.lib.utils.errors import exception_to_string from authentik.lib.utils.reflection import class_to_path, path_to_class @@ -69,7 +70,7 @@ def ldap_sync_single(source_pk: str): try: with lock: # Delete all sync tasks from the cache - SystemTask.objects.filter(name="ldap_sync", uid__startswith=source.slug).delete() + DBSystemTask.objects.filter(name="ldap_sync", uid__startswith=source.slug).delete() task = chain( # User and group sync can happen at once, they have no dependencies on each other group( @@ -102,11 +103,11 @@ def ldap_sync_paginator(source: LDAPSource, sync: type[BaseLDAPSynchronizer]) -> @CELERY_APP.task( bind=True, - base=MonitoredTask, + base=SystemTask, soft_time_limit=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours"), task_time_limit=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours"), ) -def ldap_sync(self: MonitoredTask, source_pk: str, sync_class: str, page_cache_key: str): +def ldap_sync(self: SystemTask, source_pk: str, sync_class: str, page_cache_key: str): """Synchronization of an LDAP Source""" self.result_timeout_hours = CONFIG.get_int("ldap.task_timeout_hours") source: LDAPSource = LDAPSource.objects.filter(pk=source_pk).first() diff --git a/authentik/sources/ldap/tests/test_sync.py b/authentik/sources/ldap/tests/test_sync.py index 32c042be5..5012007bd 100644 --- a/authentik/sources/ldap/tests/test_sync.py +++ b/authentik/sources/ldap/tests/test_sync.py @@ -8,7 +8,7 @@ from authentik.blueprints.tests import apply_blueprint from authentik.core.models import Group, User from authentik.core.tests.utils import create_test_admin_user from authentik.events.models import Event, EventAction, SystemTask -from authentik.events.monitored_tasks import TaskStatus +from authentik.events.system_tasks import TaskStatus from authentik.lib.generators import generate_id, generate_key from authentik.lib.utils.reflection import class_to_path from authentik.sources.ldap.models import LDAPPropertyMapping, LDAPSource diff --git a/authentik/sources/oauth/tasks.py b/authentik/sources/oauth/tasks.py index caa284729..cfac09f09 100644 --- a/authentik/sources/oauth/tasks.py +++ b/authentik/sources/oauth/tasks.py @@ -5,7 +5,7 @@ from requests import RequestException from structlog.stdlib import get_logger from authentik.events.models import TaskStatus -from authentik.events.monitored_tasks import MonitoredTask +from authentik.events.system_tasks import SystemTask from authentik.lib.utils.http import get_http_session from authentik.root.celery import CELERY_APP from authentik.sources.oauth.models import OAuthSource @@ -13,8 +13,8 @@ from authentik.sources.oauth.models import OAuthSource LOGGER = get_logger() -@CELERY_APP.task(bind=True, base=MonitoredTask) -def update_well_known_jwks(self: MonitoredTask): +@CELERY_APP.task(bind=True, base=SystemTask) +def update_well_known_jwks(self: SystemTask): """Update OAuth sources' config from well_known, and JWKS info from the configured URL""" session = get_http_session() messages = [] diff --git a/authentik/sources/plex/tasks.py b/authentik/sources/plex/tasks.py index 0934712a6..63c8b83b0 100644 --- a/authentik/sources/plex/tasks.py +++ b/authentik/sources/plex/tasks.py @@ -2,7 +2,7 @@ from requests import RequestException from authentik.events.models import Event, EventAction, TaskStatus -from authentik.events.monitored_tasks import MonitoredTask +from authentik.events.system_tasks import SystemTask from authentik.lib.utils.errors import exception_to_string from authentik.root.celery import CELERY_APP from authentik.sources.plex.models import PlexSource @@ -16,8 +16,8 @@ def check_plex_token_all(): check_plex_token.delay(source.slug) -@CELERY_APP.task(bind=True, base=MonitoredTask) -def check_plex_token(self: MonitoredTask, source_slug: int): +@CELERY_APP.task(bind=True, base=SystemTask) +def check_plex_token(self: SystemTask, source_slug: int): """Check the validity of a Plex source.""" sources = PlexSource.objects.filter(slug=source_slug) if not sources.exists(): diff --git a/authentik/stages/email/tasks.py b/authentik/stages/email/tasks.py index 51a0c6260..407995b29 100644 --- a/authentik/stages/email/tasks.py +++ b/authentik/stages/email/tasks.py @@ -10,7 +10,7 @@ from django.utils.text import slugify from structlog.stdlib import get_logger from authentik.events.models import Event, EventAction, TaskStatus -from authentik.events.monitored_tasks import MonitoredTask +from authentik.events.system_tasks import SystemTask from authentik.root.celery import CELERY_APP from authentik.stages.email.models import EmailStage from authentik.stages.email.utils import logo_data @@ -44,9 +44,9 @@ def get_email_body(email: EmailMultiAlternatives) -> str: OSError, ), retry_backoff=True, - base=MonitoredTask, + base=SystemTask, ) -def send_mail(self: MonitoredTask, message: dict[Any, Any], email_stage_pk: Optional[str] = None): +def send_mail(self: SystemTask, message: dict[Any, Any], email_stage_pk: Optional[str] = None): """Send Email for Email Stage. Retries are scheduled automatically.""" self.save_on_success = False message_id = make_msgid(domain=DNS_NAME)