sources/ldap: bump timeout, run each sync component in its own task
closes #1411 Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
parent
9257f3c919
commit
06af306e8a
|
@ -1,5 +1,6 @@
|
|||
"""authentik lib reflection utilities"""
|
||||
from importlib import import_module
|
||||
from typing import Union
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
|
@ -19,12 +20,12 @@ def all_subclasses(cls, sort=True):
|
|||
return classes
|
||||
|
||||
|
||||
def class_to_path(cls):
|
||||
def class_to_path(cls: type) -> str:
|
||||
"""Turn Class (Class or instance) into module path"""
|
||||
return f"{cls.__module__}.{cls.__name__}"
|
||||
|
||||
|
||||
def path_to_class(path):
|
||||
def path_to_class(path: Union[str, None]) -> Union[type, None]:
|
||||
"""Import module and return class"""
|
||||
if not path:
|
||||
return None
|
||||
|
|
|
@ -1,12 +1,11 @@
|
|||
"""Source API Views"""
|
||||
from typing import Any
|
||||
|
||||
from django.http.response import Http404
|
||||
from django.utils.text import slugify
|
||||
from django_filters.filters import AllValuesMultipleFilter
|
||||
from django_filters.filterset import FilterSet
|
||||
from drf_spectacular.types import OpenApiTypes
|
||||
from drf_spectacular.utils import OpenApiResponse, extend_schema, extend_schema_field
|
||||
from drf_spectacular.utils import extend_schema, extend_schema_field
|
||||
from rest_framework.decorators import action
|
||||
from rest_framework.exceptions import ValidationError
|
||||
from rest_framework.request import Request
|
||||
|
@ -19,6 +18,9 @@ from authentik.core.api.sources import SourceSerializer
|
|||
from authentik.core.api.used_by import UsedByMixin
|
||||
from authentik.events.monitored_tasks import TaskInfo
|
||||
from authentik.sources.ldap.models import LDAPPropertyMapping, LDAPSource
|
||||
from authentik.sources.ldap.sync.groups import GroupLDAPSynchronizer
|
||||
from authentik.sources.ldap.sync.membership import MembershipLDAPSynchronizer
|
||||
from authentik.sources.ldap.sync.users import UserLDAPSynchronizer
|
||||
|
||||
|
||||
class LDAPSourceSerializer(SourceSerializer):
|
||||
|
@ -95,19 +97,24 @@ class LDAPSourceViewSet(UsedByMixin, ModelViewSet):
|
|||
|
||||
@extend_schema(
|
||||
responses={
|
||||
200: TaskSerializer(many=False),
|
||||
404: OpenApiResponse(description="Task not found"),
|
||||
200: TaskSerializer(many=True),
|
||||
}
|
||||
)
|
||||
@action(methods=["GET"], detail=True)
|
||||
@action(methods=["GET"], detail=True, pagination_class=None, filter_backends=[])
|
||||
# pylint: disable=unused-argument
|
||||
def sync_status(self, request: Request, slug: str) -> Response:
|
||||
"""Get source's sync status"""
|
||||
source = self.get_object()
|
||||
task = TaskInfo.by_name(f"ldap_sync_{slugify(source.name)}")
|
||||
if not task:
|
||||
raise Http404
|
||||
return Response(TaskSerializer(task, many=False).data)
|
||||
results = []
|
||||
for sync_class in [
|
||||
UserLDAPSynchronizer,
|
||||
GroupLDAPSynchronizer,
|
||||
MembershipLDAPSynchronizer,
|
||||
]:
|
||||
task = TaskInfo.by_name(f"ldap_sync_{slugify(source.name)}-{sync_class.__name__}")
|
||||
if task:
|
||||
results.append(task)
|
||||
return Response(TaskSerializer(results, many=True).data)
|
||||
|
||||
|
||||
class LDAPPropertyMappingSerializer(PropertyMappingSerializer):
|
||||
|
|
|
@ -4,7 +4,7 @@ from celery.schedules import crontab
|
|||
CELERY_BEAT_SCHEDULE = {
|
||||
"sources_ldap_sync": {
|
||||
"task": "authentik.sources.ldap.tasks.ldap_sync_all",
|
||||
"schedule": crontab(minute="*/60"), # Run every hour
|
||||
"schedule": crontab(minute="*/120"), # Run every other hour
|
||||
"options": {"queue": "authentik_scheduled"},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,8 +11,12 @@ from authentik.core.models import User
|
|||
from authentik.core.signals import password_changed
|
||||
from authentik.events.models import Event, EventAction
|
||||
from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER
|
||||
from authentik.lib.utils.reflection import class_to_path
|
||||
from authentik.sources.ldap.models import LDAPSource
|
||||
from authentik.sources.ldap.password import LDAPPasswordChanger
|
||||
from authentik.sources.ldap.sync.groups import GroupLDAPSynchronizer
|
||||
from authentik.sources.ldap.sync.membership import MembershipLDAPSynchronizer
|
||||
from authentik.sources.ldap.sync.users import UserLDAPSynchronizer
|
||||
from authentik.sources.ldap.tasks import ldap_sync
|
||||
from authentik.stages.prompt.signals import password_validate
|
||||
|
||||
|
@ -22,7 +26,12 @@ from authentik.stages.prompt.signals import password_validate
|
|||
def sync_ldap_source_on_save(sender, instance: LDAPSource, **_):
|
||||
"""Ensure that source is synced on save (if enabled)"""
|
||||
if instance.enabled:
|
||||
ldap_sync.delay(instance.pk)
|
||||
for sync_class in [
|
||||
UserLDAPSynchronizer,
|
||||
GroupLDAPSynchronizer,
|
||||
MembershipLDAPSynchronizer,
|
||||
]:
|
||||
ldap_sync.delay(instance.pk, class_to_path(sync_class))
|
||||
|
||||
|
||||
@receiver(password_validate)
|
||||
|
|
|
@ -4,6 +4,7 @@ from ldap3.core.exceptions import LDAPException
|
|||
from structlog.stdlib import get_logger
|
||||
|
||||
from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
|
||||
from authentik.lib.utils.reflection import class_to_path, path_to_class
|
||||
from authentik.root.celery import CELERY_APP
|
||||
from authentik.sources.ldap.models import LDAPSource
|
||||
from authentik.sources.ldap.sync.groups import GroupLDAPSynchronizer
|
||||
|
@ -17,11 +18,18 @@ LOGGER = get_logger()
|
|||
def ldap_sync_all():
|
||||
"""Sync all sources"""
|
||||
for source in LDAPSource.objects.filter(enabled=True):
|
||||
ldap_sync.delay(source.pk)
|
||||
for sync_class in [
|
||||
UserLDAPSynchronizer,
|
||||
GroupLDAPSynchronizer,
|
||||
MembershipLDAPSynchronizer,
|
||||
]:
|
||||
ldap_sync.delay(source.pk, class_to_path(sync_class))
|
||||
|
||||
|
||||
@CELERY_APP.task(bind=True, base=MonitoredTask)
|
||||
def ldap_sync(self: MonitoredTask, source_pk: str):
|
||||
@CELERY_APP.task(
|
||||
bind=True, base=MonitoredTask, soft_time_limit=60 * 60 * 2, task_time_limit=60 * 60 * 2
|
||||
)
|
||||
def ldap_sync(self: MonitoredTask, source_pk: str, sync_class: str):
|
||||
"""Synchronization of an LDAP Source"""
|
||||
self.result_timeout_hours = 2
|
||||
try:
|
||||
|
@ -30,17 +38,13 @@ def ldap_sync(self: MonitoredTask, source_pk: str):
|
|||
# Because the source couldn't be found, we don't have a UID
|
||||
# to set the state with
|
||||
return
|
||||
self.set_uid(slugify(source.name))
|
||||
sync = path_to_class(sync_class)
|
||||
self.set_uid(f"{slugify(source.name)}-{sync.__name__}")
|
||||
try:
|
||||
messages = []
|
||||
for sync_class in [
|
||||
UserLDAPSynchronizer,
|
||||
GroupLDAPSynchronizer,
|
||||
MembershipLDAPSynchronizer,
|
||||
]:
|
||||
sync_inst = sync_class(source)
|
||||
sync_inst = sync(source)
|
||||
count = sync_inst.sync()
|
||||
messages.append(f"Synced {count} objects from {sync_class.__name__}")
|
||||
messages.append(f"Synced {count} objects.")
|
||||
self.set_status(
|
||||
TaskResult(
|
||||
TaskResultStatus.SUCCESSFUL,
|
||||
|
|
|
@ -12018,7 +12018,7 @@ paths:
|
|||
$ref: '#/components/schemas/GenericError'
|
||||
/sources/ldap/{slug}/sync_status/:
|
||||
get:
|
||||
operationId: sources_ldap_sync_status_retrieve
|
||||
operationId: sources_ldap_sync_status_list
|
||||
description: Get source's sync status
|
||||
parameters:
|
||||
- in: path
|
||||
|
@ -12036,10 +12036,10 @@ paths:
|
|||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/components/schemas/Task'
|
||||
description: ''
|
||||
'404':
|
||||
description: Task not found
|
||||
'400':
|
||||
$ref: '#/components/schemas/ValidationError'
|
||||
'403':
|
||||
|
|
|
@ -3464,7 +3464,6 @@ msgstr "Resources"
|
|||
msgid "Result"
|
||||
msgstr "Result"
|
||||
|
||||
#: src/pages/sources/ldap/LDAPSourceViewPage.ts
|
||||
#: src/pages/system-tasks/SystemTaskListPage.ts
|
||||
msgid "Retry Task"
|
||||
msgstr "Retry Task"
|
||||
|
@ -3491,6 +3490,10 @@ msgstr "Return to device picker"
|
|||
msgid "Revoked?"
|
||||
msgstr "Revoked?"
|
||||
|
||||
#: src/pages/sources/ldap/LDAPSourceViewPage.ts
|
||||
msgid "Run sync again"
|
||||
msgstr "Run sync again"
|
||||
|
||||
#: src/pages/property-mappings/PropertyMappingSAMLForm.ts
|
||||
msgid "SAML Attribute Name"
|
||||
msgstr "SAML Attribute Name"
|
||||
|
|
|
@ -3456,7 +3456,6 @@ msgstr ""
|
|||
msgid "Result"
|
||||
msgstr ""
|
||||
|
||||
#: src/pages/sources/ldap/LDAPSourceViewPage.ts
|
||||
#: src/pages/system-tasks/SystemTaskListPage.ts
|
||||
msgid "Retry Task"
|
||||
msgstr ""
|
||||
|
@ -3483,6 +3482,10 @@ msgstr ""
|
|||
msgid "Revoked?"
|
||||
msgstr ""
|
||||
|
||||
#: src/pages/sources/ldap/LDAPSourceViewPage.ts
|
||||
msgid "Run sync again"
|
||||
msgstr ""
|
||||
|
||||
#: src/pages/property-mappings/PropertyMappingSAMLForm.ts
|
||||
msgid "SAML Attribute Name"
|
||||
msgstr ""
|
||||
|
|
|
@ -12,6 +12,7 @@ import PFDisplay from "@patternfly/patternfly/utilities/Display/display.css";
|
|||
import AKGlobal from "../../../authentik.css";
|
||||
import PFBase from "@patternfly/patternfly/patternfly-base.css";
|
||||
import PFButton from "@patternfly/patternfly/components/Button/button.css";
|
||||
import PFList from "@patternfly/patternfly/components/List/list.css";
|
||||
|
||||
import "../../../elements/buttons/SpinnerButton";
|
||||
import "../../../elements/buttons/ActionButton";
|
||||
|
@ -53,6 +54,7 @@ export class LDAPSourceViewPage extends LitElement {
|
|||
PFCard,
|
||||
PFDescriptionList,
|
||||
PFSizing,
|
||||
PFList,
|
||||
AKGlobal,
|
||||
];
|
||||
}
|
||||
|
@ -168,35 +170,34 @@ export class LDAPSourceViewPage extends LitElement {
|
|||
<div class="pf-c-card__body">
|
||||
${until(
|
||||
new SourcesApi(DEFAULT_CONFIG)
|
||||
.sourcesLdapSyncStatusRetrieve({
|
||||
.sourcesLdapSyncStatusList({
|
||||
slug: this.source.slug,
|
||||
})
|
||||
.then((ls) => {
|
||||
let header = html``;
|
||||
if (ls.status === StatusEnum.Warning) {
|
||||
header = html`<p>
|
||||
${t`Task finished with warnings`}
|
||||
</p>`;
|
||||
} else if (status === StatusEnum.Error) {
|
||||
header = html`<p>
|
||||
${t`Task finished with errors`}
|
||||
</p>`;
|
||||
} else {
|
||||
header = html`<p>
|
||||
${t`Last sync: ${ls.taskFinishTimestamp.toLocaleString()}`}
|
||||
</p>`;
|
||||
.then((tasks) => {
|
||||
if (tasks.length < 1) {
|
||||
return html`<p>${t`Not synced yet.`}</p>`;
|
||||
}
|
||||
return html`
|
||||
${header}
|
||||
<ul>
|
||||
${ls.messages.map((m) => {
|
||||
return html`<ul class="pf-c-list">
|
||||
${tasks.map((task) => {
|
||||
let header = "";
|
||||
if (task.status === StatusEnum.Warning) {
|
||||
header = t`Task finished with warnings`;
|
||||
} else if (task.status === StatusEnum.Error) {
|
||||
header = t`Task finished with errors`;
|
||||
} else {
|
||||
header = t`Last sync: ${task.taskFinishTimestamp.toLocaleString()}`;
|
||||
}
|
||||
return html`<li>
|
||||
<p>${task.taskName}</p>
|
||||
<ul class="pf-c-list">
|
||||
<li>${header}</li>
|
||||
${task.messages.map((m) => {
|
||||
return html`<li>${m}</li>`;
|
||||
})}
|
||||
</ul>
|
||||
`;
|
||||
})
|
||||
.catch(() => {
|
||||
return html`<p>${t`Not synced yet.`}</p>`;
|
||||
</li> `;
|
||||
})}
|
||||
</ul>`;
|
||||
}),
|
||||
"loading",
|
||||
)}
|
||||
|
@ -212,7 +213,7 @@ export class LDAPSourceViewPage extends LitElement {
|
|||
});
|
||||
}}
|
||||
>
|
||||
${t`Retry Task`}
|
||||
${t`Run sync again`}
|
||||
</ak-action-button>
|
||||
</div>
|
||||
</div>
|
||||
|
|
Reference in a new issue