events: migrate system tasks to save in DB

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Jens Langhammer 2024-01-13 20:31:45 +01:00
parent 7c116acf0f
commit 7fd9d31101
No known key found for this signature in database
30 changed files with 699 additions and 602 deletions

View file

@ -1,134 +0,0 @@
"""Tasks API"""
from importlib import import_module
from django.contrib import messages
from django.http.response import Http404
from django.utils.translation import gettext_lazy as _
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import OpenApiParameter, OpenApiResponse, extend_schema
from rest_framework.decorators import action
from rest_framework.fields import (
CharField,
ChoiceField,
DateTimeField,
ListField,
SerializerMethodField,
)
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.viewsets import ViewSet
from structlog.stdlib import get_logger
from authentik.api.decorators import permission_required
from authentik.core.api.utils import PassiveSerializer
from authentik.events.monitored_tasks import TaskInfo, TaskResultStatus
from authentik.rbac.permissions import HasPermission
LOGGER = get_logger()
class TaskSerializer(PassiveSerializer):
"""Serialize TaskInfo and TaskResult"""
task_name = CharField()
task_description = CharField()
task_finish_timestamp = DateTimeField(source="finish_time")
task_duration = SerializerMethodField()
status = ChoiceField(
source="result.status.name",
choices=[(x.name, x.name) for x in TaskResultStatus],
)
messages = ListField(source="result.messages")
def get_task_duration(self, instance: TaskInfo) -> int:
"""Get the duration a task took to run"""
return max(instance.finish_timestamp - instance.start_timestamp, 0)
def to_representation(self, instance: TaskInfo):
"""When a new version of authentik adds fields to TaskInfo,
the API will fail with an AttributeError, as the classes
are pickled in cache. In that case, just delete the info"""
try:
return super().to_representation(instance)
# pylint: disable=broad-except
except Exception: # pragma: no cover
if isinstance(self.instance, list):
for inst in self.instance:
inst.delete()
else:
self.instance.delete()
return {}
class TaskViewSet(ViewSet):
"""Read-only view set that returns all background tasks"""
permission_classes = [HasPermission("authentik_rbac.view_system_tasks")]
serializer_class = TaskSerializer
@extend_schema(
responses={
200: TaskSerializer(many=False),
404: OpenApiResponse(description="Task not found"),
},
parameters=[
OpenApiParameter(
"id",
type=OpenApiTypes.STR,
location=OpenApiParameter.PATH,
required=True,
),
],
)
def retrieve(self, request: Request, pk=None) -> Response:
"""Get a single system task"""
task = TaskInfo.by_name(pk)
if not task:
raise Http404
return Response(TaskSerializer(task, many=False).data)
@extend_schema(responses={200: TaskSerializer(many=True)})
def list(self, request: Request) -> Response:
"""List system tasks"""
tasks = sorted(TaskInfo.all().values(), key=lambda task: task.task_name)
return Response(TaskSerializer(tasks, many=True).data)
@permission_required(None, ["authentik_rbac.run_system_tasks"])
@extend_schema(
request=OpenApiTypes.NONE,
responses={
204: OpenApiResponse(description="Task retried successfully"),
404: OpenApiResponse(description="Task not found"),
500: OpenApiResponse(description="Failed to retry task"),
},
parameters=[
OpenApiParameter(
"id",
type=OpenApiTypes.STR,
location=OpenApiParameter.PATH,
required=True,
),
],
)
@action(detail=True, methods=["post"])
def retry(self, request: Request, pk=None) -> Response:
"""Retry task"""
task = TaskInfo.by_name(pk)
if not task:
raise Http404
try:
task_module = import_module(task.task_call_module)
task_func = getattr(task_module, task.task_call_func)
LOGGER.debug("Running task", task=task_func)
task_func.delay(*task.task_call_args, **task.task_call_kwargs)
messages.success(
self.request,
_("Successfully re-scheduled Task %(name)s!" % {"name": task.task_name}),
)
return Response(status=204)
except (ImportError, AttributeError): # pragma: no cover
LOGGER.warning("Failed to run task, remove state", task=task)
# if we get an import error, the module path has probably changed
task.delete()
return Response(status=500)

View file

@ -1,7 +1,6 @@
"""admin signals"""
from django.dispatch import receiver
from authentik.admin.api.tasks import TaskInfo
from authentik.admin.apps import GAUGE_WORKERS
from authentik.root.celery import CELERY_APP
from authentik.root.monitoring import monitoring_set
@ -12,10 +11,3 @@ def monitoring_set_workers(sender, **kwargs):
"""Set worker gauge"""
count = len(CELERY_APP.control.ping(timeout=0.5))
GAUGE_WORKERS.set(count)
@receiver(monitoring_set)
def monitoring_set_tasks(sender, **kwargs):
"""Set task gauges"""
for task in TaskInfo.all().values():
task.update_metrics()

View file

@ -11,12 +11,7 @@ from structlog.stdlib import get_logger
from authentik import __version__, get_build_hash
from authentik.admin.apps import PROM_INFO
from authentik.events.models import Event, EventAction, Notification
from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.events.monitored_tasks import MonitoredTask, TaskStatus, prefill_task
from authentik.lib.config import CONFIG
from authentik.lib.utils.http import get_http_session
from authentik.root.celery import CELERY_APP
@ -60,7 +55,7 @@ def update_latest_version(self: MonitoredTask):
"""Update latest version info"""
if CONFIG.get_bool("disable_update_check"):
cache.set(VERSION_CACHE_KEY, "0.0.0", VERSION_CACHE_TIMEOUT)
self.set_status(TaskResult(TaskResultStatus.WARNING, messages=["Version check disabled."]))
self.set_status(TaskStatus.WARNING, "Version check disabled.")
return
try:
response = get_http_session().get(
@ -70,9 +65,7 @@ def update_latest_version(self: MonitoredTask):
data = response.json()
upstream_version = data.get("stable", {}).get("version")
cache.set(VERSION_CACHE_KEY, upstream_version, VERSION_CACHE_TIMEOUT)
self.set_status(
TaskResult(TaskResultStatus.SUCCESSFUL, ["Successfully updated latest Version"])
)
self.set_status(TaskStatus.SUCCESSFUL, "Successfully updated latest Version")
_set_prom_info()
# Check if upstream version is newer than what we're running,
# and if no event exists yet, create one.
@ -89,7 +82,7 @@ def update_latest_version(self: MonitoredTask):
Event.new(EventAction.UPDATE_AVAILABLE, **event_dict).save()
except (RequestException, IndexError) as exc:
cache.set(VERSION_CACHE_KEY, "0.0.0", VERSION_CACHE_TIMEOUT)
self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
self.set_error(exc)
_set_prom_info()

View file

@ -8,7 +8,7 @@ from authentik import __version__
from authentik.blueprints.tests import reconcile_app
from authentik.core.models import Group, User
from authentik.core.tasks import clean_expired_models
from authentik.events.monitored_tasks import TaskResultStatus
from authentik.events.monitored_tasks import TaskStatus
from authentik.lib.generators import generate_id
@ -42,7 +42,7 @@ class TestAdminAPI(TestCase):
)
self.assertEqual(response.status_code, 200)
body = loads(response.content)
self.assertEqual(body["status"], TaskResultStatus.SUCCESSFUL.name)
self.assertEqual(body["status"], TaskStatus.SUCCESSFUL.name)
self.assertEqual(body["task_name"], "clean_expired_models")
response = self.client.get(
reverse("authentik_api:admin_system_tasks-detail", kwargs={"pk": "qwerqwer"})

View file

@ -4,12 +4,10 @@ from django.urls import path
from authentik.admin.api.meta import AppsViewSet, ModelViewSet
from authentik.admin.api.metrics import AdministrationMetricsViewSet
from authentik.admin.api.system import SystemView
from authentik.admin.api.tasks import TaskViewSet
from authentik.admin.api.version import VersionView
from authentik.admin.api.workers import WorkerView
api_urlpatterns = [
("admin/system_tasks", TaskViewSet, "admin_system_tasks"),
("admin/apps", AppsViewSet, "apps"),
("admin/models", ModelViewSet, "models"),
path(

View file

@ -29,12 +29,8 @@ from authentik.blueprints.v1.common import BlueprintLoader, BlueprintMetadata, E
from authentik.blueprints.v1.importer import Importer
from authentik.blueprints.v1.labels import LABEL_AUTHENTIK_INSTANTIATE
from authentik.blueprints.v1.oci import OCI_PREFIX
from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.events.models import TaskStatus
from authentik.events.monitored_tasks import MonitoredTask, prefill_task
from authentik.events.utils import sanitize_dict
from authentik.lib.config import CONFIG
from authentik.root.celery import CELERY_APP
@ -140,10 +136,7 @@ def blueprints_discovery(self: MonitoredTask, path: Optional[str] = None):
check_blueprint_v1_file(blueprint)
count += 1
self.set_status(
TaskResult(
TaskResultStatus.SUCCESSFUL,
messages=[_("Successfully imported %(count)d files." % {"count": count})],
)
TaskStatus.SUCCESSFUL, _("Successfully imported %(count)d files." % {"count": count})
)
@ -196,18 +189,18 @@ def apply_blueprint(self: MonitoredTask, instance_pk: str):
if not valid:
instance.status = BlueprintInstanceStatus.ERROR
instance.save()
self.set_status(TaskResult(TaskResultStatus.ERROR, [x["event"] for x in logs]))
self.set_status(TaskStatus.ERROR, *[x["event"] for x in logs])
return
applied = importer.apply()
if not applied:
instance.status = BlueprintInstanceStatus.ERROR
instance.save()
self.set_status(TaskResult(TaskResultStatus.ERROR, "Failed to apply"))
self.set_status(TaskStatus.ERROR, "Failed to apply")
return
instance.status = BlueprintInstanceStatus.SUCCESSFUL
instance.last_applied_hash = file_hash
instance.last_applied = now()
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL))
self.set_status(TaskStatus.SUCCESSFUL)
except (
DatabaseError,
ProgrammingError,
@ -218,7 +211,7 @@ def apply_blueprint(self: MonitoredTask, instance_pk: str):
) as exc:
if instance:
instance.status = BlueprintInstanceStatus.ERROR
self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
self.set_error(exc)
finally:
if instance:
instance.save()

View file

@ -13,12 +13,7 @@ from authentik.core.models import (
ExpiringModel,
User,
)
from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.events.monitored_tasks import MonitoredTask, TaskStatus, prefill_task
from authentik.root.celery import CELERY_APP
LOGGER = get_logger()
@ -54,7 +49,7 @@ def clean_expired_models(self: MonitoredTask):
amount += 1
LOGGER.debug("Expired sessions", model=AuthenticatedSession, amount=amount)
messages.append(f"Expired {amount} {AuthenticatedSession._meta.verbose_name_plural}")
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, messages))
self.set_status(TaskStatus.SUCCESSFUL, *messages)
@CELERY_APP.task(bind=True, base=MonitoredTask)
@ -75,4 +70,4 @@ def clean_temporary_users(self: MonitoredTask):
user.delete()
deleted_users += 1
messages.append(f"Successfully deleted {deleted_users} users.")
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, messages))
self.set_status(TaskStatus.SUCCESSFUL, *messages)

View file

@ -9,12 +9,8 @@ from django.utils.translation import gettext_lazy as _
from structlog.stdlib import get_logger
from authentik.crypto.models import CertificateKeyPair
from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.events.models import TaskStatus
from authentik.events.monitored_tasks import MonitoredTask, prefill_task
from authentik.lib.config import CONFIG
from authentik.root.celery import CELERY_APP
@ -88,8 +84,5 @@ def certificate_discovery(self: MonitoredTask):
if dirty:
cert.save()
self.set_status(
TaskResult(
TaskResultStatus.SUCCESSFUL,
messages=[_("Successfully imported %(count)d files." % {"count": discovered})],
)
TaskStatus.SUCCESSFUL, _("Successfully imported %(count)d files." % {"count": discovered})
)

View file

@ -0,0 +1,101 @@
"""Tasks API"""
from importlib import import_module
from django.contrib import messages
from django.utils.translation import gettext_lazy as _
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import OpenApiParameter, OpenApiResponse, extend_schema
from rest_framework.decorators import action
from rest_framework.fields import (
CharField,
ChoiceField,
DateTimeField,
ListField,
SerializerMethodField,
)
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.serializers import ModelSerializer
from rest_framework.viewsets import ReadOnlyModelViewSet
from structlog.stdlib import get_logger
from authentik.api.decorators import permission_required
from authentik.events.models import SystemTask, TaskStatus
LOGGER = get_logger()
class SystemTaskSerializer(ModelSerializer):
"""Serialize TaskInfo and TaskResult"""
task_name = CharField(source="name")
task_description = CharField(source="description")
task_start_timestamp = DateTimeField(source="start_timestamp")
task_finish_timestamp = DateTimeField(source="finish_timestamp")
task_duration = SerializerMethodField()
status = ChoiceField(
# source="status",
choices=[(x.name, x.name) for x in TaskStatus],
)
messages = ListField(child=CharField())
def get_task_duration(self, instance: SystemTask) -> int:
"""Get the duration a task took to run"""
return max(instance.finish_timestamp.timestamp() - instance.start_timestamp.timestamp(), 0)
class Meta:
model = SystemTask
fields = [
"task_name",
"task_description",
"task_start_timestamp",
"task_finish_timestamp",
"task_duration",
"status",
"messages",
]
class SystemTaskViewSet(ReadOnlyModelViewSet):
"""Read-only view set that returns all background tasks"""
queryset = SystemTask.objects.all()
serializer_class = SystemTaskSerializer
@permission_required(None, ["authentik_events.rerun_task"])
@extend_schema(
request=OpenApiTypes.NONE,
responses={
204: OpenApiResponse(description="Task retried successfully"),
404: OpenApiResponse(description="Task not found"),
500: OpenApiResponse(description="Failed to retry task"),
},
parameters=[
OpenApiParameter(
"id",
type=OpenApiTypes.STR,
location=OpenApiParameter.PATH,
required=True,
),
],
)
@action(detail=True, methods=["post"])
def retry(self, request: Request, pk=None) -> Response:
"""Retry task"""
task = self.get_object()
try:
task_module = import_module(task.task_call_module)
task_func = getattr(task_module, task.task_call_func)
LOGGER.debug("Running task", task=task_func)
task_func.delay(*task.task_call_args, **task.task_call_kwargs)
messages.success(
self.request,
_("Successfully re-scheduled Task %(name)s!" % {"name": task.task_name}),
)
return Response(status=204)
except (ImportError, AttributeError): # pragma: no cover
LOGGER.warning("Failed to run task, remove state", task=task)
# if we get an import error, the module path has probably changed
task.delete()
return Response(status=500)

View file

@ -0,0 +1,55 @@
# Generated by Django 5.0.1 on 2024-01-13 19:38
import uuid
from django.db import migrations, models
import authentik.core.models
class Migration(migrations.Migration):
dependencies = [
("authentik_events", "0002_alter_notificationtransport_mode"),
]
operations = [
migrations.CreateModel(
name="SystemTask",
fields=[
(
"expires",
models.DateTimeField(default=authentik.core.models.default_token_duration),
),
("expiring", models.BooleanField(default=True)),
(
"uuid",
models.UUIDField(
default=uuid.uuid4, editable=False, primary_key=True, serialize=False
),
),
("name", models.TextField()),
("uid", models.TextField(null=True)),
("start_timestamp", models.DateTimeField(auto_now_add=True)),
("finish_timestamp", models.DateTimeField(auto_now=True)),
(
"status",
models.PositiveIntegerField(
choices=[(1, "Successful"), (2, "Warning"), (4, "Error"), (8, "Unknown")]
),
),
("description", models.TextField(null=True)),
("messages", models.JSONField()),
("task_call_module", models.TextField()),
("task_call_func", models.TextField()),
("task_call_args", models.JSONField(default=list)),
("task_call_kwargs", models.JSONField(default=dict)),
],
options={
"verbose_name": "System Task",
"verbose_name_plural": "System Tasks",
"permissions": [("rerun_task", "Rerun task")],
"default_permissions": ["view"],
"unique_together": {("name", "uid")},
},
),
]

View file

@ -4,7 +4,7 @@ from collections import Counter
from datetime import timedelta
from inspect import currentframe
from smtplib import SMTPException
from typing import TYPE_CHECKING, Optional
from typing import Optional
from uuid import uuid4
from django.db import models
@ -18,6 +18,7 @@ from django.http.request import QueryDict
from django.utils.timezone import now
from django.utils.translation import gettext as _
from requests import RequestException
from rest_framework.serializers import Serializer
from structlog.stdlib import get_logger
from authentik import get_full_version
@ -26,6 +27,7 @@ from authentik.core.middleware import (
SESSION_KEY_IMPERSONATE_USER,
)
from authentik.core.models import ExpiringModel, Group, PropertyMapping, User
from authentik.events.apps import GAUGE_TASKS
from authentik.events.context_processors.base import get_context_processors
from authentik.events.utils import (
cleanse_dict,
@ -45,8 +47,6 @@ from authentik.tenants.models import Tenant
from authentik.tenants.utils import DEFAULT_TENANT
LOGGER = get_logger()
if TYPE_CHECKING:
from rest_framework.serializers import Serializer
def default_event_duration():
@ -267,7 +267,7 @@ class Event(SerializerModel, ExpiringModel):
super().save(*args, **kwargs)
@property
def serializer(self) -> "Serializer":
def serializer(self) -> type[Serializer]:
from authentik.events.api.events import EventSerializer
return EventSerializer
@ -475,7 +475,7 @@ class NotificationTransport(SerializerModel):
raise NotificationTransportError(exc) from exc
@property
def serializer(self) -> "Serializer":
def serializer(self) -> type[Serializer]:
from authentik.events.api.notification_transports import NotificationTransportSerializer
return NotificationTransportSerializer
@ -508,7 +508,7 @@ class Notification(SerializerModel):
user = models.ForeignKey(User, on_delete=models.CASCADE)
@property
def serializer(self) -> "Serializer":
def serializer(self) -> type[Serializer]:
from authentik.events.api.notifications import NotificationSerializer
return NotificationSerializer
@ -551,7 +551,7 @@ class NotificationRule(SerializerModel, PolicyBindingModel):
)
@property
def serializer(self) -> "Serializer":
def serializer(self) -> type[Serializer]:
from authentik.events.api.notification_rules import NotificationRuleSerializer
return NotificationRuleSerializer
@ -572,7 +572,7 @@ class NotificationWebhookMapping(PropertyMapping):
return "ak-property-mapping-notification-form"
@property
def serializer(self) -> type["Serializer"]:
def serializer(self) -> type[type[Serializer]]:
from authentik.events.api.notification_mappings import NotificationWebhookMappingSerializer
return NotificationWebhookMappingSerializer
@ -583,3 +583,59 @@ class NotificationWebhookMapping(PropertyMapping):
class Meta:
verbose_name = _("Webhook Mapping")
verbose_name_plural = _("Webhook Mappings")
class TaskStatus(models.IntegerChoices):
"""Possible states of tasks"""
SUCCESSFUL = 1
WARNING = 2
ERROR = 4
UNKNOWN = 8
class SystemTask(SerializerModel, ExpiringModel):
"""Info about a system task running in the background along with details to restart the task"""
uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
name = models.TextField()
uid = models.TextField(null=True)
start_timestamp = models.DateTimeField(auto_now_add=True)
finish_timestamp = models.DateTimeField(auto_now=True)
status = models.PositiveIntegerField(choices=TaskStatus.choices)
description = models.TextField(null=True)
messages = models.JSONField()
task_call_module = models.TextField()
task_call_func = models.TextField()
task_call_args = models.JSONField(default=list)
task_call_kwargs = models.JSONField(default=dict)
@property
def serializer(self) -> type[Serializer]:
from authentik.events.api.tasks import SystemTaskSerializer
return SystemTaskSerializer
def update_metrics(self):
"""Update prometheus metrics"""
duration = max(self.finish_timestamp.timestamp() - self.start_timestamp.timestamp(), 0)
GAUGE_TASKS.labels(
task_name=self.name.split(":")[0],
task_uid=self.uid or "",
status=self.status.name.lower(),
).set(duration)
def __str__(self) -> str:
return f"System Task {self.name}"
class Meta:
unique_together = (("name", "uid"),)
# Remove "add", "change" and "delete" permissions as those are not used
default_permissions = ["view"]
permissions = [("rerun_task", _("Rerun task"))]
verbose_name = _("System Task")
verbose_name_plural = _("System Tasks")

View file

@ -1,115 +1,18 @@
"""Monitored tasks"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from datetime import datetime, timedelta, timezone
from timeit import default_timer
from typing import Any, Optional
from celery import Task
from django.core.cache import cache
from django.db import DatabaseError, InternalError, ProgrammingError
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from structlog.stdlib import get_logger
from authentik.events.apps import GAUGE_TASKS
from authentik.events.models import Event, EventAction
from authentik.events.models import Event, EventAction, SystemTask, TaskStatus
from authentik.lib.utils.errors import exception_to_string
LOGGER = get_logger()
CACHE_KEY_PREFIX = "goauthentik.io/events/tasks/"
class TaskResultStatus(Enum):
"""Possible states of tasks"""
SUCCESSFUL = 1
WARNING = 2
ERROR = 4
UNKNOWN = 8
@dataclass
class TaskResult:
"""Result of a task run, this class is created by the task itself
and used by self.set_status"""
status: TaskResultStatus
messages: list[str] = field(default_factory=list)
# Optional UID used in cache for tasks that run in different instances
uid: Optional[str] = field(default=None)
def with_error(self, exc: Exception) -> "TaskResult":
"""Since errors might not always be pickle-able, set the traceback"""
# TODO: Mark exception somehow so that is rendered as <pre> in frontend
self.messages.append(exception_to_string(exc))
return self
@dataclass
class TaskInfo:
"""Info about a task run"""
task_name: str
start_timestamp: float
finish_timestamp: float
finish_time: datetime
result: TaskResult
task_call_module: str
task_call_func: str
task_call_args: list[Any] = field(default_factory=list)
task_call_kwargs: dict[str, Any] = field(default_factory=dict)
task_description: Optional[str] = field(default=None)
@staticmethod
def all() -> dict[str, "TaskInfo"]:
"""Get all TaskInfo objects"""
return cache.get_many(cache.keys(CACHE_KEY_PREFIX + "*"))
@staticmethod
def by_name(name: str) -> Optional["TaskInfo"] | Optional[list["TaskInfo"]]:
"""Get TaskInfo Object by name"""
if "*" in name:
return cache.get_many(cache.keys(CACHE_KEY_PREFIX + name)).values()
return cache.get(CACHE_KEY_PREFIX + name, None)
@property
def full_name(self) -> str:
"""Get the full cache key with task name and UID"""
key = CACHE_KEY_PREFIX + self.task_name
if self.result.uid:
uid_suffix = f":{self.result.uid}"
key += uid_suffix
if not self.task_name.endswith(uid_suffix):
self.task_name += uid_suffix
return key
def delete(self):
"""Delete task info from cache"""
return cache.delete(self.full_name)
def update_metrics(self):
"""Update prometheus metrics"""
start = default_timer()
if hasattr(self, "start_timestamp"):
start = self.start_timestamp
try:
duration = max(self.finish_timestamp - start, 0)
except TypeError:
duration = 0
GAUGE_TASKS.labels(
task_name=self.task_name.split(":")[0],
task_uid=self.result.uid or "",
status=self.result.status.name.lower(),
).set(duration)
def save(self, timeout_hours=6):
"""Save task into cache"""
self.update_metrics()
cache.set(self.full_name, self, timeout=timeout_hours * 60 * 60)
class MonitoredTask(Task):
@ -118,73 +21,94 @@ class MonitoredTask(Task):
# For tasks that should only be listed if they failed, set this to False
save_on_success: bool
_result: Optional[TaskResult]
_status: Optional[TaskStatus]
_messages: list[str]
_uid: Optional[str]
start: Optional[float] = None
_start: Optional[float] = None
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.save_on_success = True
self._uid = None
self._result = None
self._status = None
self._messages = []
self.result_timeout_hours = 6
def set_uid(self, uid: str):
"""Set UID, so in the case of an unexpected error its saved correctly"""
self._uid = uid
def set_status(self, result: TaskResult):
def set_status(self, status: TaskStatus, *messages: str):
"""Set result for current run, will overwrite previous result."""
self._result = result
self._status = status
self._messages = messages
def set_error(self, exception: Exception):
"""Set result to error and save exception"""
self._status = TaskStatus.ERROR
self._messages = [exception_to_string(exception)]
def before_start(self, task_id, args, kwargs):
self.start = default_timer()
self._start = default_timer()
return super().before_start(task_id, args, kwargs)
# pylint: disable=too-many-arguments
def after_return(self, status, retval, task_id, args: list[Any], kwargs: dict[str, Any], einfo):
super().after_return(status, retval, task_id, args, kwargs, einfo=einfo)
if not self._result:
if not self._status:
return
if not self._result.uid:
self._result.uid = self._uid
info = TaskInfo(
task_name=self.__name__,
task_description=self.__doc__,
start_timestamp=self.start or default_timer(),
finish_timestamp=default_timer(),
finish_time=datetime.now(),
result=self._result,
task_call_module=self.__module__,
task_call_func=self.__name__,
task_call_args=args,
task_call_kwargs=kwargs,
if self._status == TaskStatus.SUCCESSFUL and not self.save_on_success:
SystemTask.objects.filter(
name=self.__name__,
uid=self._uid,
).delete()
return
SystemTask.objects.update_or_create(
name=self.__name__,
uid=self._uid,
defaults={
"description": self.__doc__,
"start_timestamp": datetime.fromtimestamp(
self._start or default_timer(), tz=timezone.utc
),
"finish_timestamp": datetime.fromtimestamp(default_timer(), tz=timezone.utc),
"task_call_module": self.__module__,
"task_call_func": self.__name__,
"task_call_args": args,
"task_call_kwargs": kwargs,
"status": self._status,
"messages": self._messages,
"expires": now() + timedelta(hours=self.result_timeout_hours),
"expiring": True,
},
)
if self._result.status == TaskResultStatus.SUCCESSFUL and not self.save_on_success:
info.delete()
return
info.save(self.result_timeout_hours)
# pylint: disable=too-many-arguments
def on_failure(self, exc, task_id, args, kwargs, einfo):
super().on_failure(exc, task_id, args, kwargs, einfo=einfo)
if not self._result:
self._result = TaskResult(status=TaskResultStatus.ERROR, messages=[str(exc)])
if not self._result.uid:
self._result.uid = self._uid
TaskInfo(
task_name=self.__name__,
task_description=self.__doc__,
start_timestamp=self.start or default_timer(),
finish_timestamp=default_timer(),
finish_time=datetime.now(),
result=self._result,
task_call_module=self.__module__,
task_call_func=self.__name__,
task_call_args=args,
task_call_kwargs=kwargs,
).save(self.result_timeout_hours)
if not self._status:
self._status = TaskStatus.ERROR
self._messages = exception_to_string(exc)
SystemTask.objects.update_or_create(
name=self.__name__,
uid=self._uid,
defaults={
"description": self.__doc__,
"start_timestamp": datetime.fromtimestamp(
self._start or default_timer(), tz=timezone.utc
),
"finish_timestamp": datetime.fromtimestamp(default_timer(), tz=timezone.utc),
"task_call_module": self.__module__,
"task_call_func": self.__name__,
"task_call_args": args,
"task_call_kwargs": kwargs,
"status": self._status,
"messages": self._messages,
"expires": now() + timedelta(hours=self.result_timeout_hours),
"expiring": True,
},
)
Event.new(
EventAction.SYSTEM_TASK_EXCEPTION,
message=f"Task {self.__name__} encountered an error: {exception_to_string(exc)}",
@ -196,19 +120,20 @@ class MonitoredTask(Task):
def prefill_task(func):
"""Ensure a task's details are always in cache, so it can always be triggered via API"""
status = TaskInfo.by_name(func.__name__)
try:
status = SystemTask.objects.filter(name=func.__name__).first()
except (DatabaseError, InternalError, ProgrammingError):
return func
if status:
return func
TaskInfo(
task_name=func.__name__,
task_description=func.__doc__,
result=TaskResult(TaskResultStatus.UNKNOWN, messages=[_("Task has not been run yet.")]),
SystemTask.objects.create(
name=func.__name__,
description=func.__doc__,
status=TaskStatus.UNKNOWN,
messages=[_("Task has not been run yet.")],
task_call_module=func.__module__,
task_call_func=func.__name__,
# We don't have real values for these attributes but they cannot be null
start_timestamp=0,
finish_timestamp=0,
finish_time=datetime.now(),
).save(86400)
expiring=False,
)
LOGGER.debug("prefilled task", task_name=func.__name__)
return func

View file

@ -8,12 +8,13 @@ from django.http import HttpRequest
from authentik.core.models import User
from authentik.core.signals import login_failed, password_changed
from authentik.events.models import Event, EventAction
from authentik.events.models import Event, EventAction, SystemTask
from authentik.events.tasks import event_notification_handler, gdpr_cleanup
from authentik.flows.models import Stage
from authentik.flows.planner import PLAN_CONTEXT_SOURCE, FlowPlan
from authentik.flows.views.executor import SESSION_KEY_PLAN
from authentik.lib.config import CONFIG
from authentik.root.monitoring import monitoring_set
from authentik.stages.invitation.models import Invitation
from authentik.stages.invitation.signals import invitation_used
from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS
@ -100,3 +101,10 @@ def event_user_pre_delete_cleanup(sender, instance: User, **_):
"""If gdpr_compliance is enabled, remove all the user's events"""
if CONFIG.get_bool("gdpr_compliance", True):
gdpr_cleanup.delay(instance.pk)
@receiver(monitoring_set)
def monitoring_system_task(sender, **_):
"""Update metrics when task is saved"""
for task in SystemTask.objects.all():
task.update_metrics()

View file

@ -13,13 +13,9 @@ from authentik.events.models import (
NotificationRule,
NotificationTransport,
NotificationTransportError,
TaskStatus,
)
from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.events.monitored_tasks import MonitoredTask, prefill_task
from authentik.policies.engine import PolicyEngine
from authentik.policies.models import PolicyBinding, PolicyEngineMode
from authentik.root.celery import CELERY_APP
@ -123,9 +119,9 @@ def notification_transport(
if not transport:
return
transport.send(notification)
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL))
self.set_status(TaskStatus.SUCCESSFUL)
except (NotificationTransportError, PropertyMappingExpressionException) as exc:
self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
self.set_error(exc)
raise exc
@ -146,4 +142,4 @@ def notification_cleanup(self: MonitoredTask):
for notification in notifications:
notification.delete()
LOGGER.debug("Expired notifications", amount=amount)
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, [f"Expired {amount} Notifications"]))
self.set_status(TaskStatus.SUCCESSFUL, f"Expired {amount} Notifications")

View file

@ -1,7 +1,8 @@
"""Test Monitored tasks"""
from django.test import TestCase
from authentik.events.monitored_tasks import MonitoredTask, TaskInfo, TaskResult, TaskResultStatus
from authentik.events.models import SystemTask, TaskStatus
from authentik.events.monitored_tasks import MonitoredTask
from authentik.lib.generators import generate_id
from authentik.root.celery import CELERY_APP
@ -22,22 +23,20 @@ class TestMonitoredTasks(TestCase):
def test_task(self: MonitoredTask):
self.save_on_success = False
self.set_uid(uid)
self.set_status(
TaskResult(TaskResultStatus.ERROR if should_fail else TaskResultStatus.SUCCESSFUL)
)
self.set_status(TaskStatus.ERROR if should_fail else TaskStatus.SUCCESSFUL)
# First test successful run
should_fail = False
test_task.delay().get()
self.assertIsNone(TaskInfo.by_name(f"test_task:{uid}"))
self.assertIsNone(SystemTask.objects.filter(name="test_task", uid=uid))
# Then test failed
should_fail = True
test_task.delay().get()
info = TaskInfo.by_name(f"test_task:{uid}")
self.assertEqual(info.result.status, TaskResultStatus.ERROR)
info = SystemTask.objects.filter(name="test_task", uid=uid)
self.assertEqual(info.status, TaskStatus.ERROR)
# Then after that, the state should be removed
should_fail = False
test_task.delay().get()
self.assertIsNone(TaskInfo.by_name(f"test_task:{uid}"))
self.assertIsNone(SystemTask.objects.filter(name="test_task", uid=uid))

View file

@ -4,11 +4,13 @@ from authentik.events.api.notification_mappings import NotificationWebhookMappin
from authentik.events.api.notification_rules import NotificationRuleViewSet
from authentik.events.api.notification_transports import NotificationTransportViewSet
from authentik.events.api.notifications import NotificationViewSet
from authentik.events.api.tasks import SystemTaskViewSet
api_urlpatterns = [
("events/events", EventViewSet),
("events/notifications", NotificationViewSet),
("events/transports", NotificationTransportViewSet),
("events/rules", NotificationRuleViewSet),
("events/system_tasks", SystemTaskViewSet),
("propertymappings/notification", NotificationWebhookMappingViewSet),
]

View file

@ -19,12 +19,8 @@ from yaml import safe_load
from authentik.enterprise.providers.rac.controllers.docker import RACDockerController
from authentik.enterprise.providers.rac.controllers.kubernetes import RACKubernetesController
from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.events.models import TaskStatus
from authentik.events.monitored_tasks import MonitoredTask, prefill_task
from authentik.lib.config import CONFIG
from authentik.lib.utils.reflection import path_to_class
from authentik.outposts.consumer import OUTPOST_GROUP
@ -118,10 +114,8 @@ def outpost_service_connection_monitor(self: MonitoredTask):
for connection in connections.iterator():
outpost_service_connection_state.delay(connection.pk)
self.set_status(
TaskResult(
TaskResultStatus.SUCCESSFUL,
[f"Successfully updated {len(connections)} connections."],
)
TaskStatus.SUCCESSFUL,
f"Successfully updated {len(connections)} connections.",
)
@ -161,11 +155,11 @@ def outpost_controller(
LOGGER.debug(log)
LOGGER.debug("-----------------Outpost Controller logs end-------------------")
except (ControllerException, ServiceConnectionInvalid) as exc:
self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
self.set_error(exc)
else:
if from_cache:
cache.delete(CACHE_KEY_OUTPOST_DOWN % outpost_pk)
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, logs))
self.set_status(TaskStatus.SUCCESSFUL, *logs)
@CELERY_APP.task(bind=True, base=MonitoredTask)
@ -178,10 +172,8 @@ def outpost_token_ensurer(self: MonitoredTask):
_ = outpost.token
outpost.build_user_permissions(outpost.user)
self.set_status(
TaskResult(
TaskResultStatus.SUCCESSFUL,
[f"Successfully checked {len(all_outposts)} Outposts."],
)
TaskStatus.SUCCESSFUL,
f"Successfully checked {len(all_outposts)} Outposts.",
)
@ -261,27 +253,27 @@ def _outpost_single_update(outpost: Outpost, layer=None):
)
def outpost_connection_discovery(self: MonitoredTask):
"""Checks the local environment and create Service connections."""
status = TaskResult(TaskResultStatus.SUCCESSFUL)
messages = []
if not CONFIG.get_bool("outposts.discover"):
status.messages.append("Outpost integration discovery is disabled")
self.set_status(status)
messages.append("Outpost integration discovery is disabled")
self.set_status(TaskStatus.SUCCESSFUL, *messages)
return
# Explicitly check against token filename, as that's
# only present when the integration is enabled
if Path(SERVICE_TOKEN_FILENAME).exists():
status.messages.append("Detected in-cluster Kubernetes Config")
messages.append("Detected in-cluster Kubernetes Config")
if not KubernetesServiceConnection.objects.filter(local=True).exists():
status.messages.append("Created Service Connection for in-cluster")
messages.append("Created Service Connection for in-cluster")
KubernetesServiceConnection.objects.create(
name="Local Kubernetes Cluster", local=True, kubeconfig={}
)
# For development, check for the existence of a kubeconfig file
kubeconfig_path = Path(KUBE_CONFIG_DEFAULT_LOCATION).expanduser()
if kubeconfig_path.exists():
status.messages.append("Detected kubeconfig")
messages.append("Detected kubeconfig")
kubeconfig_local_name = f"k8s-{gethostname()}"
if not KubernetesServiceConnection.objects.filter(name=kubeconfig_local_name).exists():
status.messages.append("Creating kubeconfig Service Connection")
messages.append("Creating kubeconfig Service Connection")
with kubeconfig_path.open("r", encoding="utf8") as _kubeconfig:
KubernetesServiceConnection.objects.create(
name=kubeconfig_local_name,
@ -290,12 +282,12 @@ def outpost_connection_discovery(self: MonitoredTask):
unix_socket_path = urlparse(DEFAULT_UNIX_SOCKET).path
socket = Path(unix_socket_path)
if socket.exists() and access(socket, R_OK):
status.messages.append("Detected local docker socket")
messages.append("Detected local docker socket")
if len(DockerServiceConnection.objects.filter(local=True)) == 0:
status.messages.append("Created Service Connection for docker")
messages.append("Created Service Connection for docker")
DockerServiceConnection.objects.create(
name="Local Docker connection",
local=True,
url=unix_socket_path,
)
self.set_status(status)
self.set_status(TaskStatus.SUCCESSFUL, *messages)

View file

@ -4,12 +4,8 @@ from structlog.stdlib import get_logger
from authentik.events.context_processors.asn import ASN_CONTEXT_PROCESSOR
from authentik.events.context_processors.geoip import GEOIP_CONTEXT_PROCESSOR
from authentik.events.monitored_tasks import (
MonitoredTask,
TaskResult,
TaskResultStatus,
prefill_task,
)
from authentik.events.models import TaskStatus
from authentik.events.monitored_tasks import MonitoredTask, prefill_task
from authentik.policies.reputation.models import Reputation
from authentik.policies.reputation.signals import CACHE_KEY_PREFIX
from authentik.root.celery import CELERY_APP
@ -32,4 +28,4 @@ def save_reputation(self: MonitoredTask):
rep.score = score["score"]
objects_to_update.append(rep)
Reputation.objects.bulk_update(objects_to_update, ["score", "ip_geo_data"])
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, ["Successfully updated Reputation"]))
self.set_status(TaskStatus.SUCCESSFUL, "Successfully updated Reputation")

View file

@ -1,17 +1,17 @@
"""SCIM Provider API Views"""
from django.utils.text import slugify
from drf_spectacular.utils import OpenApiResponse, extend_schema
from guardian.shortcuts import get_objects_for_user
from rest_framework.decorators import action
from rest_framework.fields import BooleanField
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.viewsets import ModelViewSet
from authentik.admin.api.tasks import TaskSerializer
from authentik.core.api.providers import ProviderSerializer
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.utils import PassiveSerializer
from authentik.events.monitored_tasks import TaskInfo
from authentik.events.api.tasks import SystemTaskSerializer
from authentik.providers.scim.models import SCIMProvider
@ -43,7 +43,7 @@ class SCIMSyncStatusSerializer(PassiveSerializer):
"""SCIM Provider sync status"""
is_running = BooleanField(read_only=True)
tasks = TaskSerializer(many=True, read_only=True)
tasks = SystemTaskSerializer(many=True, read_only=True)
class SCIMProviderViewSet(UsedByMixin, ModelViewSet):
@ -65,8 +65,12 @@ class SCIMProviderViewSet(UsedByMixin, ModelViewSet):
def sync_status(self, request: Request, pk: int) -> Response:
"""Get provider's sync status"""
provider: SCIMProvider = self.get_object()
task = TaskInfo.by_name(f"scim_sync:{slugify(provider.name)}")
tasks = [task] if task else []
tasks = list(
get_objects_for_user(request.user, "authentik_events.view_systemtask").filter(
name="scim_sync",
uid=slugify(provider.name),
)
)
status = {
"tasks": tasks,
"is_running": provider.sync_lock.locked(),

View file

@ -10,7 +10,8 @@ from pydanticscim.responses import PatchOp
from structlog.stdlib import get_logger
from authentik.core.models import Group, User
from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
from authentik.events.models import TaskStatus
from authentik.events.monitored_tasks import MonitoredTask
from authentik.lib.utils.reflection import path_to_class
from authentik.providers.scim.clients import PAGE_SIZE, PAGE_TIMEOUT
from authentik.providers.scim.clients.base import SCIMClient
@ -52,8 +53,8 @@ def scim_sync(self: MonitoredTask, provider_pk: int) -> None:
LOGGER.debug("SCIM sync locked, skipping task", source=provider.name)
return
self.set_uid(slugify(provider.name))
result = TaskResult(TaskResultStatus.SUCCESSFUL, [])
result.messages.append(_("Starting full SCIM sync"))
messages = []
messages.append(_("Starting full SCIM sync"))
LOGGER.debug("Starting SCIM sync")
users_paginator = Paginator(provider.get_user_qs(), PAGE_SIZE)
groups_paginator = Paginator(provider.get_group_qs(), PAGE_SIZE)
@ -63,17 +64,17 @@ def scim_sync(self: MonitoredTask, provider_pk: int) -> None:
with allow_join_result():
try:
for page in users_paginator.page_range:
result.messages.append(_("Syncing page %(page)d of users" % {"page": page}))
messages.append(_("Syncing page %(page)d of users" % {"page": page}))
for msg in scim_sync_users.delay(page, provider_pk).get():
result.messages.append(msg)
messages.append(msg)
for page in groups_paginator.page_range:
result.messages.append(_("Syncing page %(page)d of groups" % {"page": page}))
messages.append(_("Syncing page %(page)d of groups" % {"page": page}))
for msg in scim_sync_group.delay(page, provider_pk).get():
result.messages.append(msg)
messages.append(msg)
except StopSync as exc:
self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
self.set_error(exc)
return
self.set_status(result)
self.set_status(TaskStatus.SUCCESSFUL, *messages)
@CELERY_APP.task(

View file

@ -6,6 +6,7 @@ from django_filters.filters import AllValuesMultipleFilter
from django_filters.filterset import FilterSet
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import extend_schema, extend_schema_field, inline_serializer
from guardian.shortcuts import get_objects_for_user
from rest_framework.decorators import action
from rest_framework.exceptions import ValidationError
from rest_framework.fields import BooleanField, DictField, ListField, SerializerMethodField
@ -14,13 +15,12 @@ from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.viewsets import ModelViewSet
from authentik.admin.api.tasks import TaskSerializer
from authentik.core.api.propertymappings import PropertyMappingSerializer
from authentik.core.api.sources import SourceSerializer
from authentik.core.api.used_by import UsedByMixin
from authentik.core.api.utils import PassiveSerializer
from authentik.crypto.models import CertificateKeyPair
from authentik.events.monitored_tasks import TaskInfo
from authentik.events.api.tasks import SystemTaskSerializer
from authentik.sources.ldap.models import LDAPPropertyMapping, LDAPSource
from authentik.sources.ldap.tasks import CACHE_KEY_STATUS, SYNC_CLASSES
@ -91,7 +91,7 @@ class LDAPSyncStatusSerializer(PassiveSerializer):
"""LDAP Source sync status"""
is_running = BooleanField(read_only=True)
tasks = TaskSerializer(many=True, read_only=True)
tasks = SystemTaskSerializer(many=True, read_only=True)
class LDAPSourceViewSet(UsedByMixin, ModelViewSet):
@ -136,7 +136,12 @@ class LDAPSourceViewSet(UsedByMixin, ModelViewSet):
def sync_status(self, request: Request, slug: str) -> Response:
"""Get source's sync status"""
source: LDAPSource = self.get_object()
tasks = TaskInfo.by_name(f"ldap_sync:{source.slug}:*") or []
tasks = list(
get_objects_for_user(request.user, "authentik_events.view_systemtask").filter(
name="ldap_sync",
uid__startswith=source.slug,
)
)
status = {
"tasks": tasks,
"is_running": source.sync_lock.locked(),

View file

@ -1,9 +1,7 @@
"""FreeIPA specific"""
from datetime import datetime
from datetime import datetime, timezone
from typing import Any, Generator
from pytz import UTC
from authentik.core.models import User
from authentik.sources.ldap.sync.base import BaseLDAPSynchronizer, flatten
@ -27,7 +25,7 @@ class FreeIPA(BaseLDAPSynchronizer):
if "krbLastPwdChange" not in attributes:
return
pwd_last_set: datetime = attributes.get("krbLastPwdChange", datetime.now())
pwd_last_set = pwd_last_set.replace(tzinfo=UTC)
pwd_last_set = pwd_last_set.replace(tzinfo=timezone.utc)
if created or pwd_last_set >= user.password_change_date:
self.message(f"'{user.username}': Reset user's password")
self._logger.debug(

View file

@ -1,10 +1,8 @@
"""Active Directory specific"""
from datetime import datetime
from datetime import datetime, timezone
from enum import IntFlag
from typing import Any, Generator
from pytz import UTC
from authentik.core.models import User
from authentik.sources.ldap.sync.base import BaseLDAPSynchronizer
@ -58,7 +56,7 @@ class MicrosoftActiveDirectory(BaseLDAPSynchronizer):
if "pwdLastSet" not in attributes:
return
pwd_last_set: datetime = attributes.get("pwdLastSet", datetime.now())
pwd_last_set = pwd_last_set.replace(tzinfo=UTC)
pwd_last_set = pwd_last_set.replace(tzinfo=timezone.utc)
if created or pwd_last_set >= user.password_change_date:
self.message(f"'{user.username}': Reset user's password")
self._logger.debug(

View file

@ -8,8 +8,8 @@ from ldap3.core.exceptions import LDAPException
from redis.exceptions import LockError
from structlog.stdlib import get_logger
from authentik.events.monitored_tasks import CACHE_KEY_PREFIX as CACHE_KEY_PREFIX_TASKS
from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
from authentik.events.models import SystemTask, TaskStatus
from authentik.events.monitored_tasks import MonitoredTask
from authentik.lib.config import CONFIG
from authentik.lib.utils.errors import exception_to_string
from authentik.lib.utils.reflection import class_to_path, path_to_class
@ -69,8 +69,7 @@ def ldap_sync_single(source_pk: str):
try:
with lock:
# Delete all sync tasks from the cache
keys = cache.keys(f"{CACHE_KEY_PREFIX_TASKS}ldap_sync:{source.slug}*")
cache.delete_many(keys)
SystemTask.objects.filter(name="ldap_sync", uid__startswith=source.slug).delete()
task = chain(
# User and group sync can happen at once, they have no dependencies on each other
group(
@ -127,20 +126,18 @@ def ldap_sync(self: MonitoredTask, source_pk: str, sync_class: str, page_cache_k
+ "Try increasing ldap.task_timeout_hours"
)
LOGGER.warning(error_message)
self.set_status(TaskResult(TaskResultStatus.ERROR, [error_message]))
self.set_status(TaskStatus.ERROR, error_message)
return
cache.touch(page_cache_key)
count = sync_inst.sync(page)
messages = sync_inst.messages
messages.append(f"Synced {count} objects.")
self.set_status(
TaskResult(
TaskResultStatus.SUCCESSFUL,
messages,
)
TaskStatus.SUCCESSFUL,
*messages,
)
cache.delete(page_cache_key)
except LDAPException as exc:
# No explicit event is created here as .set_status with an error will do that
LOGGER.warning(exception_to_string(exc))
self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
self.set_error(exc)

View file

@ -7,8 +7,8 @@ from django.test import TestCase
from authentik.blueprints.tests import apply_blueprint
from authentik.core.models import Group, User
from authentik.core.tests.utils import create_test_admin_user
from authentik.events.models import Event, EventAction
from authentik.events.monitored_tasks import TaskInfo, TaskResultStatus
from authentik.events.models import Event, EventAction, SystemTask
from authentik.events.monitored_tasks import TaskStatus
from authentik.lib.generators import generate_id, generate_key
from authentik.lib.utils.reflection import class_to_path
from authentik.sources.ldap.models import LDAPPropertyMapping, LDAPSource
@ -41,8 +41,8 @@ class LDAPSyncTests(TestCase):
connection = MagicMock(return_value=mock_ad_connection(LDAP_PASSWORD))
with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
ldap_sync.delay(self.source.pk, class_to_path(UserLDAPSynchronizer), "foo").get()
status = TaskInfo.by_name("ldap_sync:ldap:users:foo")
self.assertEqual(status.result.status, TaskResultStatus.ERROR)
task = SystemTask.objects.filter(name="ldap_sync", uid="ldap:users:foo").first()
self.assertEqual(task.status, TaskStatus.ERROR)
def test_sync_error(self):
"""Test user sync"""

View file

@ -4,7 +4,8 @@ from json import dumps
from requests import RequestException
from structlog.stdlib import get_logger
from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
from authentik.events.models import TaskStatus
from authentik.events.monitored_tasks import MonitoredTask
from authentik.lib.utils.http import get_http_session
from authentik.root.celery import CELERY_APP
from authentik.sources.oauth.models import OAuthSource
@ -16,7 +17,7 @@ LOGGER = get_logger()
def update_well_known_jwks(self: MonitoredTask):
"""Update OAuth sources' config from well_known, and JWKS info from the configured URL"""
session = get_http_session()
result = TaskResult(TaskResultStatus.SUCCESSFUL, [])
messages = []
for source in OAuthSource.objects.all().exclude(oidc_well_known_url=""):
try:
well_known_config = session.get(source.oidc_well_known_url)
@ -24,7 +25,7 @@ def update_well_known_jwks(self: MonitoredTask):
except RequestException as exc:
text = exc.response.text if exc.response else str(exc)
LOGGER.warning("Failed to update well_known", source=source, exc=exc, text=text)
result.messages.append(f"Failed to update OIDC configuration for {source.slug}")
messages.append(f"Failed to update OIDC configuration for {source.slug}")
continue
config = well_known_config.json()
try:
@ -47,7 +48,7 @@ def update_well_known_jwks(self: MonitoredTask):
source=source,
exc=exc,
)
result.messages.append(f"Failed to update OIDC configuration for {source.slug}")
messages.append(f"Failed to update OIDC configuration for {source.slug}")
continue
if dirty:
LOGGER.info("Updating sources' OpenID Configuration", source=source)
@ -60,11 +61,11 @@ def update_well_known_jwks(self: MonitoredTask):
except RequestException as exc:
text = exc.response.text if exc.response else str(exc)
LOGGER.warning("Failed to update JWKS", source=source, exc=exc, text=text)
result.messages.append(f"Failed to update JWKS for {source.slug}")
messages.append(f"Failed to update JWKS for {source.slug}")
continue
config = jwks_config.json()
if dumps(source.oidc_jwks, sort_keys=True) != dumps(config, sort_keys=True):
source.oidc_jwks = config
LOGGER.info("Updating sources' JWKS", source=source)
source.save()
self.set_status(result)
self.set_status(TaskStatus.SUCCESSFUL, *messages)

View file

@ -1,8 +1,8 @@
"""Plex tasks"""
from requests import RequestException
from authentik.events.models import Event, EventAction
from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
from authentik.events.models import Event, EventAction, TaskStatus
from authentik.events.monitored_tasks import MonitoredTask
from authentik.lib.utils.errors import exception_to_string
from authentik.root.celery import CELERY_APP
from authentik.sources.plex.models import PlexSource
@ -27,16 +27,15 @@ def check_plex_token(self: MonitoredTask, source_slug: int):
auth = PlexAuth(source, source.plex_token)
try:
auth.get_user_info()
self.set_status(TaskResult(TaskResultStatus.SUCCESSFUL, ["Plex token is valid."]))
self.set_status(TaskStatus.SUCCESSFUL, "Plex token is valid.")
except RequestException as exc:
error = exception_to_string(exc)
if len(source.plex_token) > 0:
error = error.replace(source.plex_token, "$PLEX_TOKEN")
self.set_status(
TaskResult(
TaskResultStatus.ERROR,
["Plex token is invalid/an error occurred:", error],
)
TaskStatus.ERROR,
"Plex token is invalid/an error occurred:",
error,
)
Event.new(
EventAction.CONFIGURATION_ERROR,

View file

@ -9,8 +9,8 @@ from django.core.mail.utils import DNS_NAME
from django.utils.text import slugify
from structlog.stdlib import get_logger
from authentik.events.models import Event, EventAction
from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
from authentik.events.models import Event, EventAction, TaskStatus
from authentik.events.monitored_tasks import MonitoredTask
from authentik.root.celery import CELERY_APP
from authentik.stages.email.models import EmailStage
from authentik.stages.email.utils import logo_data
@ -58,10 +58,8 @@ def send_mail(self: MonitoredTask, message: dict[Any, Any], email_stage_pk: Opti
stages = EmailStage.objects.filter(pk=email_stage_pk)
if not stages.exists():
self.set_status(
TaskResult(
TaskResultStatus.WARNING,
messages=["Email stage does not exist anymore. Discarding message."],
)
TaskStatus.WARNING,
"Email stage does not exist anymore. Discarding message.",
)
return
stage: EmailStage = stages.first()
@ -69,7 +67,7 @@ def send_mail(self: MonitoredTask, message: dict[Any, Any], email_stage_pk: Opti
backend = stage.backend
except ValueError as exc:
LOGGER.warning("failed to get email backend", exc=exc)
self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
self.set_error(exc)
return
backend.open()
# Since django's EmailMessage objects are not JSON serialisable,
@ -97,12 +95,10 @@ def send_mail(self: MonitoredTask, message: dict[Any, Any], email_stage_pk: Opti
to_email=message_object.to,
).save()
self.set_status(
TaskResult(
TaskResultStatus.SUCCESSFUL,
messages=["Successfully sent Mail."],
)
TaskStatus.SUCCESSFUL,
"Successfully sent Mail.",
)
except (SMTPException, ConnectionError, OSError) as exc:
LOGGER.debug("Error sending email, retrying...", exc=exc)
self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc))
self.set_error(exc)
raise exc

View file

@ -263,6 +263,43 @@
}
}
},
{
"type": "object",
"required": [
"model",
"identifiers"
],
"properties": {
"model": {
"const": "authentik_events.systemtask"
},
"id": {
"type": "string"
},
"state": {
"type": "string",
"enum": [
"absent",
"present",
"created",
"must_created"
],
"default": "present"
},
"conditions": {
"type": "array",
"items": {
"type": "boolean"
}
},
"attrs": {
"$ref": "#/$defs/model_authentik_events.systemtask"
},
"identifiers": {
"$ref": "#/$defs/model_authentik_events.systemtask"
}
}
},
{
"type": "object",
"required": [
@ -3197,6 +3234,50 @@
},
"required": []
},
"model_authentik_events.systemtask": {
"type": "object",
"properties": {
"task_name": {
"type": "string",
"minLength": 1,
"title": "Task name"
},
"task_description": {
"type": "string",
"minLength": 1,
"title": "Task description"
},
"task_start_timestamp": {
"type": "string",
"format": "date-time",
"title": "Task start timestamp"
},
"task_finish_timestamp": {
"type": "string",
"format": "date-time",
"title": "Task finish timestamp"
},
"status": {
"type": "string",
"enum": [
"SUCCESSFUL",
"WARNING",
"ERROR",
"UNKNOWN"
],
"title": "Status"
},
"messages": {
"type": "array",
"items": {
"type": "string",
"minLength": 1
},
"title": "Messages"
}
},
"required": []
},
"model_authentik_flows.flow": {
"type": "object",
"properties": {
@ -3607,6 +3688,7 @@
"authentik_events.notification",
"authentik_events.notificationrule",
"authentik_events.notificationwebhookmapping",
"authentik_events.systemtask",
"authentik_flows.flow",
"authentik_flows.flowstagebinding",
"authentik_outposts.dockerserviceconnection",

View file

@ -147,103 +147,6 @@ paths:
schema:
$ref: '#/components/schemas/GenericError'
description: ''
/admin/system_tasks/:
get:
operationId: admin_system_tasks_list
description: List system tasks
tags:
- admin
security:
- authentik: []
responses:
'200':
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/Task'
description: ''
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/ValidationError'
description: ''
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/GenericError'
description: ''
/admin/system_tasks/{id}/:
get:
operationId: admin_system_tasks_retrieve
description: Get a single system task
parameters:
- in: path
name: id
schema:
type: string
required: true
tags:
- admin
security:
- authentik: []
responses:
'200':
content:
application/json:
schema:
$ref: '#/components/schemas/Task'
description: ''
'404':
description: Task not found
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/ValidationError'
description: ''
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/GenericError'
description: ''
/admin/system_tasks/{id}/retry/:
post:
operationId: admin_system_tasks_retry_create
description: Retry task
parameters:
- in: path
name: id
schema:
type: string
required: true
tags:
- admin
security:
- authentik: []
responses:
'204':
description: Task retried successfully
'404':
description: Task not found
'500':
description: Failed to retry task
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/ValidationError'
description: ''
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/GenericError'
description: ''
/admin/version/:
get:
operationId: admin_version_retrieve
@ -6932,6 +6835,133 @@ paths:
schema:
$ref: '#/components/schemas/GenericError'
description: ''
/events/system_tasks/:
get:
operationId: events_system_tasks_list
description: Read-only view set that returns all background tasks
parameters:
- name: ordering
required: false
in: query
description: Which field to use when ordering the results.
schema:
type: string
- name: page
required: false
in: query
description: A page number within the paginated result set.
schema:
type: integer
- name: page_size
required: false
in: query
description: Number of results to return per page.
schema:
type: integer
- name: search
required: false
in: query
description: A search term.
schema:
type: string
tags:
- events
security:
- authentik: []
responses:
'200':
content:
application/json:
schema:
$ref: '#/components/schemas/PaginatedSystemTaskList'
description: ''
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/ValidationError'
description: ''
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/GenericError'
description: ''
/events/system_tasks/{uuid}/:
get:
operationId: events_system_tasks_retrieve
description: Read-only view set that returns all background tasks
parameters:
- in: path
name: uuid
schema:
type: string
format: uuid
description: A UUID string identifying this System Task.
required: true
tags:
- events
security:
- authentik: []
responses:
'200':
content:
application/json:
schema:
$ref: '#/components/schemas/SystemTask'
description: ''
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/ValidationError'
description: ''
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/GenericError'
description: ''
/events/system_tasks/{uuid}/retry/:
post:
operationId: events_system_tasks_retry_create
description: Retry task
parameters:
- in: path
name: id
schema:
type: string
required: true
- in: path
name: uuid
schema:
type: string
format: uuid
description: A UUID string identifying this System Task.
required: true
tags:
- events
security:
- authentik: []
responses:
'204':
description: Task retried successfully
'404':
description: Task not found
'500':
description: Failed to retry task
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/ValidationError'
description: ''
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/GenericError'
description: ''
/events/transports/:
get:
operationId: events_transports_list
@ -18071,6 +18101,7 @@ paths:
- authentik_events.notificationrule
- authentik_events.notificationtransport
- authentik_events.notificationwebhookmapping
- authentik_events.systemtask
- authentik_flows.flow
- authentik_flows.flowstagebinding
- authentik_outposts.dockerserviceconnection
@ -18143,6 +18174,7 @@ paths:
* `authentik_events.notification` - Notification
* `authentik_events.notificationrule` - Notification Rule
* `authentik_events.notificationwebhookmapping` - Webhook Mapping
* `authentik_events.systemtask` - System Task
* `authentik_flows.flow` - Flow
* `authentik_flows.flowstagebinding` - Flow Stage Binding
* `authentik_outposts.dockerserviceconnection` - Docker Service-Connection
@ -18365,6 +18397,7 @@ paths:
- authentik_events.notificationrule
- authentik_events.notificationtransport
- authentik_events.notificationwebhookmapping
- authentik_events.systemtask
- authentik_flows.flow
- authentik_flows.flowstagebinding
- authentik_outposts.dockerserviceconnection
@ -18437,6 +18470,7 @@ paths:
* `authentik_events.notification` - Notification
* `authentik_events.notificationrule` - Notification Rule
* `authentik_events.notificationwebhookmapping` - Webhook Mapping
* `authentik_events.systemtask` - System Task
* `authentik_flows.flow` - Flow
* `authentik_flows.flowstagebinding` - Flow Stage Binding
* `authentik_outposts.dockerserviceconnection` - Docker Service-Connection
@ -31696,6 +31730,7 @@ components:
* `authentik_events.notification` - Notification
* `authentik_events.notificationrule` - Notification Rule
* `authentik_events.notificationwebhookmapping` - Webhook Mapping
* `authentik_events.systemtask` - System Task
* `authentik_flows.flow` - Flow
* `authentik_flows.flowstagebinding` - Flow Stage Binding
* `authentik_outposts.dockerserviceconnection` - Docker Service-Connection
@ -31896,6 +31931,7 @@ components:
* `authentik_events.notification` - Notification
* `authentik_events.notificationrule` - Notification Rule
* `authentik_events.notificationwebhookmapping` - Webhook Mapping
* `authentik_events.systemtask` - System Task
* `authentik_flows.flow` - Flow
* `authentik_flows.flowstagebinding` - Flow Stage Binding
* `authentik_outposts.dockerserviceconnection` - Docker Service-Connection
@ -34047,7 +34083,7 @@ components:
tasks:
type: array
items:
$ref: '#/components/schemas/Task'
$ref: '#/components/schemas/SystemTask'
readOnly: true
required:
- is_running
@ -34214,6 +34250,7 @@ components:
- authentik_events.notification
- authentik_events.notificationrule
- authentik_events.notificationwebhookmapping
- authentik_events.systemtask
- authentik_flows.flow
- authentik_flows.flowstagebinding
- authentik_outposts.dockerserviceconnection
@ -34293,6 +34330,7 @@ components:
* `authentik_events.notification` - Notification
* `authentik_events.notificationrule` - Notification Rule
* `authentik_events.notificationwebhookmapping` - Webhook Mapping
* `authentik_events.systemtask` - System Task
* `authentik_flows.flow` - Flow
* `authentik_flows.flowstagebinding` - Flow Stage Binding
* `authentik_outposts.dockerserviceconnection` - Docker Service-Connection
@ -36281,6 +36319,18 @@ components:
required:
- pagination
- results
PaginatedSystemTaskList:
type: object
properties:
pagination:
$ref: '#/components/schemas/Pagination'
results:
type: array
items:
$ref: '#/components/schemas/SystemTask'
required:
- pagination
- results
PaginatedTOTPDeviceList:
type: object
properties:
@ -37431,6 +37481,7 @@ components:
* `authentik_events.notification` - Notification
* `authentik_events.notificationrule` - Notification Rule
* `authentik_events.notificationwebhookmapping` - Webhook Mapping
* `authentik_events.systemtask` - System Task
* `authentik_flows.flow` - Flow
* `authentik_flows.flowstagebinding` - Flow Stage Binding
* `authentik_outposts.dockerserviceconnection` - Docker Service-Connection
@ -41972,7 +42023,7 @@ components:
tasks:
type: array
items:
$ref: '#/components/schemas/Task'
$ref: '#/components/schemas/SystemTask'
readOnly: true
required:
- is_running
@ -42630,6 +42681,50 @@ components:
- runtime
- server_time
- tenant
SystemTask:
type: object
description: Serialize TaskInfo and TaskResult
properties:
task_name:
type: string
task_description:
type: string
task_start_timestamp:
type: string
format: date-time
task_finish_timestamp:
type: string
format: date-time
task_duration:
type: integer
description: Get the duration a task took to run
readOnly: true
status:
$ref: '#/components/schemas/SystemTaskStatusEnum'
messages:
type: array
items:
type: string
required:
- messages
- status
- task_description
- task_duration
- task_finish_timestamp
- task_name
- task_start_timestamp
SystemTaskStatusEnum:
enum:
- SUCCESSFUL
- WARNING
- ERROR
- UNKNOWN
type: string
description: |-
* `SUCCESSFUL` - SUCCESSFUL
* `WARNING` - WARNING
* `ERROR` - ERROR
* `UNKNOWN` - UNKNOWN
TOTPDevice:
type: object
description: Serializer for totp authenticator devices
@ -42656,45 +42751,6 @@ components:
maxLength: 64
required:
- name
Task:
type: object
description: Serialize TaskInfo and TaskResult
properties:
task_name:
type: string
task_description:
type: string
task_finish_timestamp:
type: string
format: date-time
task_duration:
type: integer
description: Get the duration a task took to run
readOnly: true
status:
$ref: '#/components/schemas/TaskStatusEnum'
messages:
type: array
items: {}
required:
- messages
- status
- task_description
- task_duration
- task_finish_timestamp
- task_name
TaskStatusEnum:
enum:
- SUCCESSFUL
- WARNING
- ERROR
- UNKNOWN
type: string
description: |-
* `SUCCESSFUL` - SUCCESSFUL
* `WARNING` - WARNING
* `ERROR` - ERROR
* `UNKNOWN` - UNKNOWN
Tenant:
type: object
description: Tenant Serializer