*: add @prefill_task() decorator to "pre-fill" tasks in cache, so they can be executed even before their schedule would do so

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
Jens Langhammer 2021-10-14 12:21:28 +02:00
parent 27982a771c
commit 4b7399f454
7 changed files with 71 additions and 6 deletions

View file

@ -11,7 +11,12 @@ from structlog.stdlib import get_logger
from authentik import ENV_GIT_HASH_KEY, __version__
from authentik.events.models import Event, EventAction, Notification
from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.lib.config import CONFIG
from authentik.lib.utils.http import get_http_session
from authentik.root.celery import CELERY_APP
@ -48,6 +53,7 @@ def clear_update_notifications():
@CELERY_APP.task(bind=True, base=MonitoredTask)
@prefill_task()
def update_latest_version(self: MonitoredTask):
"""Update latest version info"""
if CONFIG.y_bool("disable_update_check"):

View file

@ -15,7 +15,12 @@ from kubernetes.config.incluster_config import SERVICE_HOST_ENV_NAME
from structlog.stdlib import get_logger
from authentik.core.models import AuthenticatedSession, ExpiringModel
from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.lib.config import CONFIG
from authentik.root.celery import CELERY_APP
@ -23,6 +28,7 @@ LOGGER = get_logger()
@CELERY_APP.task(bind=True, base=MonitoredTask)
@prefill_task()
def clean_expired_models(self: MonitoredTask):
"""Remove expired objects"""
messages = []
@ -50,6 +56,7 @@ def clean_expired_models(self: MonitoredTask):
@CELERY_APP.task(bind=True, base=MonitoredTask)
@prefill_task()
def backup_database(self: MonitoredTask): # pragma: no cover
"""Database backup"""
self.result_timeout_hours = 25

View file

@ -7,7 +7,9 @@ from typing import Any, Optional
from celery import Task
from django.core.cache import cache
from django.utils.translation import gettext_lazy as _
from prometheus_client import Gauge
from structlog.stdlib import get_logger
from authentik.events.models import Event, EventAction
from authentik.lib.utils.errors import exception_to_string
@ -18,6 +20,8 @@ GAUGE_TASKS = Gauge(
["task_name", "task_uid", "status"],
)
LOGGER = get_logger()
class TaskResultStatus(Enum):
"""Possible states of tasks"""
@ -25,6 +29,7 @@ class TaskResultStatus(Enum):
SUCCESSFUL = 1
WARNING = 2
ERROR = 4
UNKNOWN = 8
@dataclass
@ -107,6 +112,27 @@ class TaskInfo:
cache.set(key, self, timeout=timeout_hours * 60 * 60)
def prefill_task():
"""Ensure a task's details are always in cache, so it can always be triggered via API"""
def inner_wrap(func):
TaskInfo(
task_name=func.__name__,
task_description=func.__doc__,
result=TaskResult(TaskResultStatus.UNKNOWN, messages=[_("Task has not been run yet.")]),
task_call_module=func.__module__,
task_call_func=func.__name__,
# We don't have real values for these attributes but they cannot be null
start_timestamp=default_timer(),
finish_timestamp=default_timer(),
finish_time=datetime.now(),
).save(86400)
LOGGER.debug("prefilled task", task_name=func.__name__)
return func
return inner_wrap
class MonitoredTask(Task):
"""Task which can save its state to the cache"""

View file

@ -2,11 +2,17 @@
from django.db import DatabaseError
from authentik.core.tasks import CELERY_APP
from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.managed.manager import ObjectManager
@CELERY_APP.task(bind=True, base=MonitoredTask)
@prefill_task()
def managed_reconcile(self: MonitoredTask):
"""Run ObjectManager to ensure objects are up-to-date"""
try:

View file

@ -17,7 +17,12 @@ from kubernetes.config.incluster_config import SERVICE_TOKEN_FILENAME
from kubernetes.config.kube_config import KUBE_CONFIG_DEFAULT_LOCATION
from structlog.stdlib import get_logger
from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.lib.utils.reflection import path_to_class
from authentik.outposts.controllers.base import BaseController, ControllerException
from authentik.outposts.models import (
@ -71,6 +76,7 @@ def outpost_service_connection_state(connection_pk: Any):
@CELERY_APP.task(bind=True, base=MonitoredTask)
@prefill_task()
def outpost_service_connection_monitor(self: MonitoredTask):
"""Regularly check the state of Outpost Service Connections"""
connections = OutpostServiceConnection.objects.all()
@ -120,6 +126,7 @@ def outpost_controller(
@CELERY_APP.task(bind=True, base=MonitoredTask)
@prefill_task()
def outpost_token_ensurer(self: MonitoredTask):
"""Periodically ensure that all Outposts have valid Service Accounts
and Tokens"""

View file

@ -2,7 +2,12 @@
from django.core.cache import cache
from structlog.stdlib import get_logger
from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.policies.reputation.models import IPReputation, UserReputation
from authentik.policies.reputation.signals import CACHE_KEY_IP_PREFIX, CACHE_KEY_USER_PREFIX
from authentik.root.celery import CELERY_APP
@ -11,6 +16,7 @@ LOGGER = get_logger()
@CELERY_APP.task(bind=True, base=MonitoredTask)
@prefill_task()
def save_ip_reputation(self: MonitoredTask):
"""Save currently cached reputation to database"""
objects_to_update = []
@ -24,6 +30,7 @@ def save_ip_reputation(self: MonitoredTask):
@CELERY_APP.task(bind=True, base=MonitoredTask)
@prefill_task()
def save_user_reputation(self: MonitoredTask):
"""Save currently cached reputation to database"""
objects_to_update = []

View file

@ -3,7 +3,12 @@ from django.utils.timezone import now
from structlog.stdlib import get_logger
from authentik.core.models import AuthenticatedSession, User
from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.lib.utils.time import timedelta_from_string
from authentik.root.celery import CELERY_APP
from authentik.sources.saml.models import SAMLSource
@ -12,6 +17,7 @@ LOGGER = get_logger()
@CELERY_APP.task(bind=True, base=MonitoredTask)
@prefill_task()
def clean_temporary_users(self: MonitoredTask):
"""Remove temporary users created by SAML Sources"""
_now = now()