From 06af306e8a67c272202b35a04569c56ee328dd4f Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Fri, 17 Sep 2021 12:24:02 +0200 Subject: [PATCH] sources/ldap: bump timeout, run each sync component in its own task closes #1411 Signed-off-by: Jens Langhammer --- authentik/lib/utils/reflection.py | 5 +- authentik/sources/ldap/api.py | 25 ++++++--- authentik/sources/ldap/settings.py | 2 +- authentik/sources/ldap/signals.py | 11 +++- authentik/sources/ldap/tasks.py | 28 ++++++---- schema.yml | 8 +-- web/src/locales/en.po | 5 +- web/src/locales/pseudo-LOCALE.po | 5 +- .../pages/sources/ldap/LDAPSourceViewPage.ts | 55 ++++++++++--------- 9 files changed, 86 insertions(+), 58 deletions(-) diff --git a/authentik/lib/utils/reflection.py b/authentik/lib/utils/reflection.py index 1cfd73d86..c5acfbc0c 100644 --- a/authentik/lib/utils/reflection.py +++ b/authentik/lib/utils/reflection.py @@ -1,5 +1,6 @@ """authentik lib reflection utilities""" from importlib import import_module +from typing import Union from django.conf import settings @@ -19,12 +20,12 @@ def all_subclasses(cls, sort=True): return classes -def class_to_path(cls): +def class_to_path(cls: type) -> str: """Turn Class (Class or instance) into module path""" return f"{cls.__module__}.{cls.__name__}" -def path_to_class(path): +def path_to_class(path: Union[str, None]) -> Union[type, None]: """Import module and return class""" if not path: return None diff --git a/authentik/sources/ldap/api.py b/authentik/sources/ldap/api.py index 918940c30..aae8e643a 100644 --- a/authentik/sources/ldap/api.py +++ b/authentik/sources/ldap/api.py @@ -1,12 +1,11 @@ """Source API Views""" from typing import Any -from django.http.response import Http404 from django.utils.text import slugify from django_filters.filters import AllValuesMultipleFilter from django_filters.filterset import FilterSet from drf_spectacular.types import OpenApiTypes -from drf_spectacular.utils import OpenApiResponse, extend_schema, extend_schema_field +from drf_spectacular.utils import extend_schema, extend_schema_field from rest_framework.decorators import action from rest_framework.exceptions import ValidationError from rest_framework.request import Request @@ -19,6 +18,9 @@ from authentik.core.api.sources import SourceSerializer from authentik.core.api.used_by import UsedByMixin from authentik.events.monitored_tasks import TaskInfo from authentik.sources.ldap.models import LDAPPropertyMapping, LDAPSource +from authentik.sources.ldap.sync.groups import GroupLDAPSynchronizer +from authentik.sources.ldap.sync.membership import MembershipLDAPSynchronizer +from authentik.sources.ldap.sync.users import UserLDAPSynchronizer class LDAPSourceSerializer(SourceSerializer): @@ -95,19 +97,24 @@ class LDAPSourceViewSet(UsedByMixin, ModelViewSet): @extend_schema( responses={ - 200: TaskSerializer(many=False), - 404: OpenApiResponse(description="Task not found"), + 200: TaskSerializer(many=True), } ) - @action(methods=["GET"], detail=True) + @action(methods=["GET"], detail=True, pagination_class=None, filter_backends=[]) # pylint: disable=unused-argument def sync_status(self, request: Request, slug: str) -> Response: """Get source's sync status""" source = self.get_object() - task = TaskInfo.by_name(f"ldap_sync_{slugify(source.name)}") - if not task: - raise Http404 - return Response(TaskSerializer(task, many=False).data) + results = [] + for sync_class in [ + UserLDAPSynchronizer, + GroupLDAPSynchronizer, + MembershipLDAPSynchronizer, + ]: + task = TaskInfo.by_name(f"ldap_sync_{slugify(source.name)}-{sync_class.__name__}") + if task: + results.append(task) + return Response(TaskSerializer(results, many=True).data) class LDAPPropertyMappingSerializer(PropertyMappingSerializer): diff --git a/authentik/sources/ldap/settings.py b/authentik/sources/ldap/settings.py index d3b90e901..224b240ac 100644 --- a/authentik/sources/ldap/settings.py +++ b/authentik/sources/ldap/settings.py @@ -4,7 +4,7 @@ from celery.schedules import crontab CELERY_BEAT_SCHEDULE = { "sources_ldap_sync": { "task": "authentik.sources.ldap.tasks.ldap_sync_all", - "schedule": crontab(minute="*/60"), # Run every hour + "schedule": crontab(minute="*/120"), # Run every other hour "options": {"queue": "authentik_scheduled"}, } } diff --git a/authentik/sources/ldap/signals.py b/authentik/sources/ldap/signals.py index fc7edc74f..3565a1543 100644 --- a/authentik/sources/ldap/signals.py +++ b/authentik/sources/ldap/signals.py @@ -11,8 +11,12 @@ from authentik.core.models import User from authentik.core.signals import password_changed from authentik.events.models import Event, EventAction from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER +from authentik.lib.utils.reflection import class_to_path from authentik.sources.ldap.models import LDAPSource from authentik.sources.ldap.password import LDAPPasswordChanger +from authentik.sources.ldap.sync.groups import GroupLDAPSynchronizer +from authentik.sources.ldap.sync.membership import MembershipLDAPSynchronizer +from authentik.sources.ldap.sync.users import UserLDAPSynchronizer from authentik.sources.ldap.tasks import ldap_sync from authentik.stages.prompt.signals import password_validate @@ -22,7 +26,12 @@ from authentik.stages.prompt.signals import password_validate def sync_ldap_source_on_save(sender, instance: LDAPSource, **_): """Ensure that source is synced on save (if enabled)""" if instance.enabled: - ldap_sync.delay(instance.pk) + for sync_class in [ + UserLDAPSynchronizer, + GroupLDAPSynchronizer, + MembershipLDAPSynchronizer, + ]: + ldap_sync.delay(instance.pk, class_to_path(sync_class)) @receiver(password_validate) diff --git a/authentik/sources/ldap/tasks.py b/authentik/sources/ldap/tasks.py index aaf28c132..55a257c9e 100644 --- a/authentik/sources/ldap/tasks.py +++ b/authentik/sources/ldap/tasks.py @@ -4,6 +4,7 @@ from ldap3.core.exceptions import LDAPException from structlog.stdlib import get_logger from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus +from authentik.lib.utils.reflection import class_to_path, path_to_class from authentik.root.celery import CELERY_APP from authentik.sources.ldap.models import LDAPSource from authentik.sources.ldap.sync.groups import GroupLDAPSynchronizer @@ -17,11 +18,18 @@ LOGGER = get_logger() def ldap_sync_all(): """Sync all sources""" for source in LDAPSource.objects.filter(enabled=True): - ldap_sync.delay(source.pk) + for sync_class in [ + UserLDAPSynchronizer, + GroupLDAPSynchronizer, + MembershipLDAPSynchronizer, + ]: + ldap_sync.delay(source.pk, class_to_path(sync_class)) -@CELERY_APP.task(bind=True, base=MonitoredTask) -def ldap_sync(self: MonitoredTask, source_pk: str): +@CELERY_APP.task( + bind=True, base=MonitoredTask, soft_time_limit=60 * 60 * 2, task_time_limit=60 * 60 * 2 +) +def ldap_sync(self: MonitoredTask, source_pk: str, sync_class: str): """Synchronization of an LDAP Source""" self.result_timeout_hours = 2 try: @@ -30,17 +38,13 @@ def ldap_sync(self: MonitoredTask, source_pk: str): # Because the source couldn't be found, we don't have a UID # to set the state with return - self.set_uid(slugify(source.name)) + sync = path_to_class(sync_class) + self.set_uid(f"{slugify(source.name)}-{sync.__name__}") try: messages = [] - for sync_class in [ - UserLDAPSynchronizer, - GroupLDAPSynchronizer, - MembershipLDAPSynchronizer, - ]: - sync_inst = sync_class(source) - count = sync_inst.sync() - messages.append(f"Synced {count} objects from {sync_class.__name__}") + sync_inst = sync(source) + count = sync_inst.sync() + messages.append(f"Synced {count} objects.") self.set_status( TaskResult( TaskResultStatus.SUCCESSFUL, diff --git a/schema.yml b/schema.yml index 8697d5c07..742f04c9d 100644 --- a/schema.yml +++ b/schema.yml @@ -12018,7 +12018,7 @@ paths: $ref: '#/components/schemas/GenericError' /sources/ldap/{slug}/sync_status/: get: - operationId: sources_ldap_sync_status_retrieve + operationId: sources_ldap_sync_status_list description: Get source's sync status parameters: - in: path @@ -12036,10 +12036,10 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/Task' + type: array + items: + $ref: '#/components/schemas/Task' description: '' - '404': - description: Task not found '400': $ref: '#/components/schemas/ValidationError' '403': diff --git a/web/src/locales/en.po b/web/src/locales/en.po index d587f4d24..2d03005a1 100644 --- a/web/src/locales/en.po +++ b/web/src/locales/en.po @@ -3464,7 +3464,6 @@ msgstr "Resources" msgid "Result" msgstr "Result" -#: src/pages/sources/ldap/LDAPSourceViewPage.ts #: src/pages/system-tasks/SystemTaskListPage.ts msgid "Retry Task" msgstr "Retry Task" @@ -3491,6 +3490,10 @@ msgstr "Return to device picker" msgid "Revoked?" msgstr "Revoked?" +#: src/pages/sources/ldap/LDAPSourceViewPage.ts +msgid "Run sync again" +msgstr "Run sync again" + #: src/pages/property-mappings/PropertyMappingSAMLForm.ts msgid "SAML Attribute Name" msgstr "SAML Attribute Name" diff --git a/web/src/locales/pseudo-LOCALE.po b/web/src/locales/pseudo-LOCALE.po index b51696342..a205f30bd 100644 --- a/web/src/locales/pseudo-LOCALE.po +++ b/web/src/locales/pseudo-LOCALE.po @@ -3456,7 +3456,6 @@ msgstr "" msgid "Result" msgstr "" -#: src/pages/sources/ldap/LDAPSourceViewPage.ts #: src/pages/system-tasks/SystemTaskListPage.ts msgid "Retry Task" msgstr "" @@ -3483,6 +3482,10 @@ msgstr "" msgid "Revoked?" msgstr "" +#: src/pages/sources/ldap/LDAPSourceViewPage.ts +msgid "Run sync again" +msgstr "" + #: src/pages/property-mappings/PropertyMappingSAMLForm.ts msgid "SAML Attribute Name" msgstr "" diff --git a/web/src/pages/sources/ldap/LDAPSourceViewPage.ts b/web/src/pages/sources/ldap/LDAPSourceViewPage.ts index f2a57f7f9..07fd0173c 100644 --- a/web/src/pages/sources/ldap/LDAPSourceViewPage.ts +++ b/web/src/pages/sources/ldap/LDAPSourceViewPage.ts @@ -12,6 +12,7 @@ import PFDisplay from "@patternfly/patternfly/utilities/Display/display.css"; import AKGlobal from "../../../authentik.css"; import PFBase from "@patternfly/patternfly/patternfly-base.css"; import PFButton from "@patternfly/patternfly/components/Button/button.css"; +import PFList from "@patternfly/patternfly/components/List/list.css"; import "../../../elements/buttons/SpinnerButton"; import "../../../elements/buttons/ActionButton"; @@ -53,6 +54,7 @@ export class LDAPSourceViewPage extends LitElement { PFCard, PFDescriptionList, PFSizing, + PFList, AKGlobal, ]; } @@ -168,35 +170,34 @@ export class LDAPSourceViewPage extends LitElement {
${until( new SourcesApi(DEFAULT_CONFIG) - .sourcesLdapSyncStatusRetrieve({ + .sourcesLdapSyncStatusList({ slug: this.source.slug, }) - .then((ls) => { - let header = html``; - if (ls.status === StatusEnum.Warning) { - header = html`

- ${t`Task finished with warnings`} -

`; - } else if (status === StatusEnum.Error) { - header = html`

- ${t`Task finished with errors`} -

`; - } else { - header = html`

- ${t`Last sync: ${ls.taskFinishTimestamp.toLocaleString()}`} -

`; + .then((tasks) => { + if (tasks.length < 1) { + return html`

${t`Not synced yet.`}

`; } - return html` - ${header} -
    - ${ls.messages.map((m) => { - return html`
  • ${m}
  • `; - })} -
- `; - }) - .catch(() => { - return html`

${t`Not synced yet.`}

`; + return html`
    + ${tasks.map((task) => { + let header = ""; + if (task.status === StatusEnum.Warning) { + header = t`Task finished with warnings`; + } else if (task.status === StatusEnum.Error) { + header = t`Task finished with errors`; + } else { + header = t`Last sync: ${task.taskFinishTimestamp.toLocaleString()}`; + } + return html`
  • +

    ${task.taskName}

    +
      +
    • ${header}
    • + ${task.messages.map((m) => { + return html`
    • ${m}
    • `; + })} +
    +
  • `; + })} +
`; }), "loading", )} @@ -212,7 +213,7 @@ export class LDAPSourceViewPage extends LitElement { }); }} > - ${t`Retry Task`} + ${t`Run sync again`}