From 4b7399f4544f1ed3b8e0d6c8570f7c0a17704004 Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Thu, 14 Oct 2021 12:21:28 +0200 Subject: [PATCH] *: add @prefill_task() decorator to "pre-fill" tasks in cache, so they can be executed even before their schedule would do so Signed-off-by: Jens Langhammer --- authentik/admin/tasks.py | 8 +++++++- authentik/core/tasks.py | 9 ++++++++- authentik/events/monitored_tasks.py | 26 ++++++++++++++++++++++++++ authentik/managed/tasks.py | 8 +++++++- authentik/outposts/tasks.py | 9 ++++++++- authentik/policies/reputation/tasks.py | 9 ++++++++- authentik/sources/saml/tasks.py | 8 +++++++- 7 files changed, 71 insertions(+), 6 deletions(-) diff --git a/authentik/admin/tasks.py b/authentik/admin/tasks.py index 4149b210f..1ddf43870 100644 --- a/authentik/admin/tasks.py +++ b/authentik/admin/tasks.py @@ -11,7 +11,12 @@ from structlog.stdlib import get_logger from authentik import ENV_GIT_HASH_KEY, __version__ from authentik.events.models import Event, EventAction, Notification -from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus +from authentik.events.monitored_tasks import ( + MonitoredTask, + TaskResult, + TaskResultStatus, + prefill_task, +) from authentik.lib.config import CONFIG from authentik.lib.utils.http import get_http_session from authentik.root.celery import CELERY_APP @@ -48,6 +53,7 @@ def clear_update_notifications(): @CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task() def update_latest_version(self: MonitoredTask): """Update latest version info""" if CONFIG.y_bool("disable_update_check"): diff --git a/authentik/core/tasks.py b/authentik/core/tasks.py index c2b824c52..844560550 100644 --- a/authentik/core/tasks.py +++ b/authentik/core/tasks.py @@ -15,7 +15,12 @@ from kubernetes.config.incluster_config import SERVICE_HOST_ENV_NAME from structlog.stdlib import get_logger from authentik.core.models import AuthenticatedSession, ExpiringModel -from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus +from authentik.events.monitored_tasks import ( + MonitoredTask, + TaskResult, + TaskResultStatus, + prefill_task, +) from authentik.lib.config import CONFIG from authentik.root.celery import CELERY_APP @@ -23,6 +28,7 @@ LOGGER = get_logger() @CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task() def clean_expired_models(self: MonitoredTask): """Remove expired objects""" messages = [] @@ -50,6 +56,7 @@ def clean_expired_models(self: MonitoredTask): @CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task() def backup_database(self: MonitoredTask): # pragma: no cover """Database backup""" self.result_timeout_hours = 25 diff --git a/authentik/events/monitored_tasks.py b/authentik/events/monitored_tasks.py index b45494384..d867bba78 100644 --- a/authentik/events/monitored_tasks.py +++ b/authentik/events/monitored_tasks.py @@ -7,7 +7,9 @@ from typing import Any, Optional from celery import Task from django.core.cache import cache +from django.utils.translation import gettext_lazy as _ from prometheus_client import Gauge +from structlog.stdlib import get_logger from authentik.events.models import Event, EventAction from authentik.lib.utils.errors import exception_to_string @@ -18,6 +20,8 @@ GAUGE_TASKS = Gauge( ["task_name", "task_uid", "status"], ) +LOGGER = get_logger() + class TaskResultStatus(Enum): """Possible states of tasks""" @@ -25,6 +29,7 @@ class TaskResultStatus(Enum): SUCCESSFUL = 1 WARNING = 2 ERROR = 4 + UNKNOWN = 8 @dataclass @@ -107,6 +112,27 @@ class TaskInfo: cache.set(key, self, timeout=timeout_hours * 60 * 60) +def prefill_task(): + """Ensure a task's details are always in cache, so it can always be triggered via API""" + + def inner_wrap(func): + TaskInfo( + task_name=func.__name__, + task_description=func.__doc__, + result=TaskResult(TaskResultStatus.UNKNOWN, messages=[_("Task has not been run yet.")]), + task_call_module=func.__module__, + task_call_func=func.__name__, + # We don't have real values for these attributes but they cannot be null + start_timestamp=default_timer(), + finish_timestamp=default_timer(), + finish_time=datetime.now(), + ).save(86400) + LOGGER.debug("prefilled task", task_name=func.__name__) + return func + + return inner_wrap + + class MonitoredTask(Task): """Task which can save its state to the cache""" diff --git a/authentik/managed/tasks.py b/authentik/managed/tasks.py index 589ccf3c2..978b458b0 100644 --- a/authentik/managed/tasks.py +++ b/authentik/managed/tasks.py @@ -2,11 +2,17 @@ from django.db import DatabaseError from authentik.core.tasks import CELERY_APP -from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus +from authentik.events.monitored_tasks import ( + MonitoredTask, + TaskResult, + TaskResultStatus, + prefill_task, +) from authentik.managed.manager import ObjectManager @CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task() def managed_reconcile(self: MonitoredTask): """Run ObjectManager to ensure objects are up-to-date""" try: diff --git a/authentik/outposts/tasks.py b/authentik/outposts/tasks.py index fb2db9c77..63d7f4370 100644 --- a/authentik/outposts/tasks.py +++ b/authentik/outposts/tasks.py @@ -17,7 +17,12 @@ from kubernetes.config.incluster_config import SERVICE_TOKEN_FILENAME from kubernetes.config.kube_config import KUBE_CONFIG_DEFAULT_LOCATION from structlog.stdlib import get_logger -from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus +from authentik.events.monitored_tasks import ( + MonitoredTask, + TaskResult, + TaskResultStatus, + prefill_task, +) from authentik.lib.utils.reflection import path_to_class from authentik.outposts.controllers.base import BaseController, ControllerException from authentik.outposts.models import ( @@ -71,6 +76,7 @@ def outpost_service_connection_state(connection_pk: Any): @CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task() def outpost_service_connection_monitor(self: MonitoredTask): """Regularly check the state of Outpost Service Connections""" connections = OutpostServiceConnection.objects.all() @@ -120,6 +126,7 @@ def outpost_controller( @CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task() def outpost_token_ensurer(self: MonitoredTask): """Periodically ensure that all Outposts have valid Service Accounts and Tokens""" diff --git a/authentik/policies/reputation/tasks.py b/authentik/policies/reputation/tasks.py index 503d3a739..eb6dcd6e3 100644 --- a/authentik/policies/reputation/tasks.py +++ b/authentik/policies/reputation/tasks.py @@ -2,7 +2,12 @@ from django.core.cache import cache from structlog.stdlib import get_logger -from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus +from authentik.events.monitored_tasks import ( + MonitoredTask, + TaskResult, + TaskResultStatus, + prefill_task, +) from authentik.policies.reputation.models import IPReputation, UserReputation from authentik.policies.reputation.signals import CACHE_KEY_IP_PREFIX, CACHE_KEY_USER_PREFIX from authentik.root.celery import CELERY_APP @@ -11,6 +16,7 @@ LOGGER = get_logger() @CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task() def save_ip_reputation(self: MonitoredTask): """Save currently cached reputation to database""" objects_to_update = [] @@ -24,6 +30,7 @@ def save_ip_reputation(self: MonitoredTask): @CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task() def save_user_reputation(self: MonitoredTask): """Save currently cached reputation to database""" objects_to_update = [] diff --git a/authentik/sources/saml/tasks.py b/authentik/sources/saml/tasks.py index 91ec96b85..5c495956f 100644 --- a/authentik/sources/saml/tasks.py +++ b/authentik/sources/saml/tasks.py @@ -3,7 +3,12 @@ from django.utils.timezone import now from structlog.stdlib import get_logger from authentik.core.models import AuthenticatedSession, User -from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus +from authentik.events.monitored_tasks import ( + MonitoredTask, + TaskResult, + TaskResultStatus, + prefill_task, +) from authentik.lib.utils.time import timedelta_from_string from authentik.root.celery import CELERY_APP from authentik.sources.saml.models import SAMLSource @@ -12,6 +17,7 @@ LOGGER = get_logger() @CELERY_APP.task(bind=True, base=MonitoredTask) +@prefill_task() def clean_temporary_users(self: MonitoredTask): """Remove temporary users created by SAML Sources""" _now = now()