Compare commits

...
This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.

1 commit

Author SHA1 Message Date
Jens Langhammer 7d1efd7450
root: replace celery queues with priority
Signed-off-by: Jens Langhammer <jens@goauthentik.io>
2023-10-18 12:50:24 +02:00
15 changed files with 42 additions and 18 deletions

View file

@ -1,12 +1,13 @@
"""authentik admin settings""" """authentik admin settings"""
from celery.schedules import crontab from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = { CELERY_BEAT_SCHEDULE = {
"admin_latest_version": { "admin_latest_version": {
"task": "authentik.admin.tasks.update_latest_version", "task": "authentik.admin.tasks.update_latest_version",
"schedule": crontab(minute=fqdn_rand("admin_latest_version"), hour="*"), "schedule": crontab(minute=fqdn_rand("admin_latest_version"), hour="*"),
"options": {"queue": "authentik_scheduled"}, "options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
} }
} }

View file

@ -1,17 +1,18 @@
"""blueprint Settings""" """blueprint Settings"""
from celery.schedules import crontab from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = { CELERY_BEAT_SCHEDULE = {
"blueprints_v1_discover": { "blueprints_v1_discover": {
"task": "authentik.blueprints.v1.tasks.blueprints_discovery", "task": "authentik.blueprints.v1.tasks.blueprints_discovery",
"schedule": crontab(minute=fqdn_rand("blueprints_v1_discover"), hour="*"), "schedule": crontab(minute=fqdn_rand("blueprints_v1_discover"), hour="*"),
"options": {"queue": "authentik_scheduled"}, "options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
}, },
"blueprints_v1_cleanup": { "blueprints_v1_cleanup": {
"task": "authentik.blueprints.v1.tasks.clear_failed_blueprints", "task": "authentik.blueprints.v1.tasks.clear_failed_blueprints",
"schedule": crontab(minute=fqdn_rand("blueprints_v1_cleanup"), hour="*"), "schedule": crontab(minute=fqdn_rand("blueprints_v1_cleanup"), hour="*"),
"options": {"queue": "authentik_scheduled"}, "options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
}, },
} }

View file

@ -33,7 +33,6 @@ class Command(BaseCommand):
task_events=True, task_events=True,
beat=options.get("beat", True), beat=options.get("beat", True),
schedule_filename=f"{tempdir}/celerybeat-schedule", schedule_filename=f"{tempdir}/celerybeat-schedule",
queues=["authentik", "authentik_scheduled", "authentik_events"],
) )
for task in CELERY_APP.tasks: for task in CELERY_APP.tasks:
LOGGER.debug("Registered task", task=task) LOGGER.debug("Registered task", task=task)

View file

@ -1,12 +1,13 @@
"""Crypto task Settings""" """Crypto task Settings"""
from celery.schedules import crontab from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = { CELERY_BEAT_SCHEDULE = {
"crypto_certificate_discovery": { "crypto_certificate_discovery": {
"task": "authentik.crypto.tasks.certificate_discovery", "task": "authentik.crypto.tasks.certificate_discovery",
"schedule": crontab(minute=fqdn_rand("crypto_certificate_discovery"), hour="*"), "schedule": crontab(minute=fqdn_rand("crypto_certificate_discovery"), hour="*"),
"options": {"queue": "authentik_scheduled"}, "options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
}, },
} }

View file

@ -1,12 +1,13 @@
"""Enterprise additional settings""" """Enterprise additional settings"""
from celery.schedules import crontab from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = { CELERY_BEAT_SCHEDULE = {
"enterprise_calculate_license": { "enterprise_calculate_license": {
"task": "authentik.enterprise.tasks.calculate_license", "task": "authentik.enterprise.tasks.calculate_license",
"schedule": crontab(minute=fqdn_rand("calculate_license"), hour="*/8"), "schedule": crontab(minute=fqdn_rand("calculate_license"), hour="*/8"),
"options": {"queue": "authentik_scheduled"}, "options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
} }
} }

View file

@ -1,12 +1,13 @@
"""Event Settings""" """Event Settings"""
from celery.schedules import crontab from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = { CELERY_BEAT_SCHEDULE = {
"events_notification_cleanup": { "events_notification_cleanup": {
"task": "authentik.events.tasks.notification_cleanup", "task": "authentik.events.tasks.notification_cleanup",
"schedule": crontab(minute=fqdn_rand("notification_cleanup"), hour="*/8"), "schedule": crontab(minute=fqdn_rand("notification_cleanup"), hour="*/8"),
"options": {"queue": "authentik_scheduled"}, "options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
}, },
} }

View file

@ -20,6 +20,7 @@ from authentik.events.monitored_tasks import (
TaskResultStatus, TaskResultStatus,
prefill_task, prefill_task,
) )
from authentik.lib.config import CONFIG
from authentik.policies.engine import PolicyEngine from authentik.policies.engine import PolicyEngine
from authentik.policies.models import PolicyBinding, PolicyEngineMode from authentik.policies.models import PolicyBinding, PolicyEngineMode
from authentik.root.celery import CELERY_APP from authentik.root.celery import CELERY_APP
@ -89,7 +90,7 @@ def event_trigger_handler(event_uuid: str, trigger_name: str):
user.pk, user.pk,
str(trigger.pk), str(trigger.pk),
], ],
queue="authentik_events", priority=CONFIG.get_int("worker.priority.events"),
) )
if transport.send_once: if transport.send_once:
break break

View file

@ -114,3 +114,8 @@ web:
worker: worker:
concurrency: 2 concurrency: 2
priority:
default: 4
scheduled: 9
sync: 9
events: 8

View file

@ -1,27 +1,28 @@
"""Outposts Settings""" """Outposts Settings"""
from celery.schedules import crontab from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = { CELERY_BEAT_SCHEDULE = {
"outposts_controller": { "outposts_controller": {
"task": "authentik.outposts.tasks.outpost_controller_all", "task": "authentik.outposts.tasks.outpost_controller_all",
"schedule": crontab(minute=fqdn_rand("outposts_controller"), hour="*/4"), "schedule": crontab(minute=fqdn_rand("outposts_controller"), hour="*/4"),
"options": {"queue": "authentik_scheduled"}, "options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
}, },
"outposts_service_connection_check": { "outposts_service_connection_check": {
"task": "authentik.outposts.tasks.outpost_service_connection_monitor", "task": "authentik.outposts.tasks.outpost_service_connection_monitor",
"schedule": crontab(minute="3-59/15"), "schedule": crontab(minute="3-59/15"),
"options": {"queue": "authentik_scheduled"}, "options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
}, },
"outpost_token_ensurer": { "outpost_token_ensurer": {
"task": "authentik.outposts.tasks.outpost_token_ensurer", "task": "authentik.outposts.tasks.outpost_token_ensurer",
"schedule": crontab(minute=fqdn_rand("outpost_token_ensurer"), hour="*/8"), "schedule": crontab(minute=fqdn_rand("outpost_token_ensurer"), hour="*/8"),
"options": {"queue": "authentik_scheduled"}, "options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
}, },
"outpost_connection_discovery": { "outpost_connection_discovery": {
"task": "authentik.outposts.tasks.outpost_connection_discovery", "task": "authentik.outposts.tasks.outpost_connection_discovery",
"schedule": crontab(minute=fqdn_rand("outpost_connection_discovery"), hour="*/8"), "schedule": crontab(minute=fqdn_rand("outpost_connection_discovery"), hour="*/8"),
"options": {"queue": "authentik_scheduled"}, "options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
}, },
} }

View file

@ -1,10 +1,12 @@
"""Reputation Settings""" """Reputation Settings"""
from celery.schedules import crontab from celery.schedules import crontab
from authentik.lib.config import CONFIG
CELERY_BEAT_SCHEDULE = { CELERY_BEAT_SCHEDULE = {
"policies_reputation_save": { "policies_reputation_save": {
"task": "authentik.policies.reputation.tasks.save_reputation", "task": "authentik.policies.reputation.tasks.save_reputation",
"schedule": crontab(minute="1-59/5"), "schedule": crontab(minute="1-59/5"),
"options": {"queue": "authentik_scheduled"}, "options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
}, },
} }

View file

@ -1,12 +1,13 @@
"""SCIM task Settings""" """SCIM task Settings"""
from celery.schedules import crontab from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = { CELERY_BEAT_SCHEDULE = {
"providers_scim_sync": { "providers_scim_sync": {
"task": "authentik.providers.scim.tasks.scim_sync_all", "task": "authentik.providers.scim.tasks.scim_sync_all",
"schedule": crontab(minute=fqdn_rand("scim_sync_all"), hour="*"), "schedule": crontab(minute=fqdn_rand("scim_sync_all"), hour="*"),
"options": {"queue": "authentik_scheduled"}, "options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
}, },
} }

View file

@ -339,17 +339,23 @@ CELERY = {
"clean_expired_models": { "clean_expired_models": {
"task": "authentik.core.tasks.clean_expired_models", "task": "authentik.core.tasks.clean_expired_models",
"schedule": crontab(minute="2-59/5"), "schedule": crontab(minute="2-59/5"),
"options": {"queue": "authentik_scheduled"}, "options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
}, },
"user_cleanup": { "user_cleanup": {
"task": "authentik.core.tasks.clean_temporary_users", "task": "authentik.core.tasks.clean_temporary_users",
"schedule": crontab(minute="9-59/5"), "schedule": crontab(minute="9-59/5"),
"options": {"queue": "authentik_scheduled"}, "options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
}, },
}, },
"task_create_missing_queues": True, "task_create_missing_queues": True,
"task_default_queue": "authentik", "task_default_queue": "authentik",
"task_default_priority": 4,
"task_inherit_parent_priority": True,
"broker_url": f"{_redis_url}/{CONFIG.get('redis.db')}{_redis_celery_tls_requirements}", "broker_url": f"{_redis_url}/{CONFIG.get('redis.db')}{_redis_celery_tls_requirements}",
"broker_transport_options": {
"queue_order_strategy": "priority",
"priority_steps": list(range(10)),
},
"result_backend": f"{_redis_url}/{CONFIG.get('redis.db')}{_redis_celery_tls_requirements}", "result_backend": f"{_redis_url}/{CONFIG.get('redis.db')}{_redis_celery_tls_requirements}",
} }

View file

@ -1,12 +1,13 @@
"""LDAP Settings""" """LDAP Settings"""
from celery.schedules import crontab from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = { CELERY_BEAT_SCHEDULE = {
"sources_ldap_sync": { "sources_ldap_sync": {
"task": "authentik.sources.ldap.tasks.ldap_sync_all", "task": "authentik.sources.ldap.tasks.ldap_sync_all",
"schedule": crontab(minute=fqdn_rand("sources_ldap_sync"), hour="*/2"), "schedule": crontab(minute=fqdn_rand("sources_ldap_sync"), hour="*/2"),
"options": {"queue": "authentik_scheduled"}, "options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
} }
} }

View file

@ -58,10 +58,12 @@ def ldap_sync_single(source_pk: str):
group( group(
ldap_sync_paginator(source, UserLDAPSynchronizer) ldap_sync_paginator(source, UserLDAPSynchronizer)
+ ldap_sync_paginator(source, GroupLDAPSynchronizer), + ldap_sync_paginator(source, GroupLDAPSynchronizer),
priority=CONFIG.get_int("worker.priority.sync"),
), ),
# Membership sync needs to run afterwards # Membership sync needs to run afterwards
group( group(
ldap_sync_paginator(source, MembershipLDAPSynchronizer), ldap_sync_paginator(source, MembershipLDAPSynchronizer),
priority=CONFIG.get_int("worker.priority.sync"),
), ),
) )
task() task()

View file

@ -1,12 +1,13 @@
"""Plex source settings""" """Plex source settings"""
from celery.schedules import crontab from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = { CELERY_BEAT_SCHEDULE = {
"check_plex_token": { "check_plex_token": {
"task": "authentik.sources.plex.tasks.check_plex_token_all", "task": "authentik.sources.plex.tasks.check_plex_token_all",
"schedule": crontab(minute=fqdn_rand("check_plex_token"), hour="*/3"), "schedule": crontab(minute=fqdn_rand("check_plex_token"), hour="*/3"),
"options": {"queue": "authentik_scheduled"}, "options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
}, },
} }