root: replace celery queues with priority

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Jens Langhammer 2023-09-11 19:33:54 +02:00
parent 35147230d7
commit 7d1efd7450
No known key found for this signature in database
15 changed files with 42 additions and 18 deletions

View file

@ -1,12 +1,13 @@
"""authentik admin settings"""
from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = {
"admin_latest_version": {
"task": "authentik.admin.tasks.update_latest_version",
"schedule": crontab(minute=fqdn_rand("admin_latest_version"), hour="*"),
"options": {"queue": "authentik_scheduled"},
"options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
}
}

View file

@ -1,17 +1,18 @@
"""blueprint Settings"""
from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = {
"blueprints_v1_discover": {
"task": "authentik.blueprints.v1.tasks.blueprints_discovery",
"schedule": crontab(minute=fqdn_rand("blueprints_v1_discover"), hour="*"),
"options": {"queue": "authentik_scheduled"},
"options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
},
"blueprints_v1_cleanup": {
"task": "authentik.blueprints.v1.tasks.clear_failed_blueprints",
"schedule": crontab(minute=fqdn_rand("blueprints_v1_cleanup"), hour="*"),
"options": {"queue": "authentik_scheduled"},
"options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
},
}

View file

@ -33,7 +33,6 @@ class Command(BaseCommand):
task_events=True,
beat=options.get("beat", True),
schedule_filename=f"{tempdir}/celerybeat-schedule",
queues=["authentik", "authentik_scheduled", "authentik_events"],
)
for task in CELERY_APP.tasks:
LOGGER.debug("Registered task", task=task)

View file

@ -1,12 +1,13 @@
"""Crypto task Settings"""
from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = {
"crypto_certificate_discovery": {
"task": "authentik.crypto.tasks.certificate_discovery",
"schedule": crontab(minute=fqdn_rand("crypto_certificate_discovery"), hour="*"),
"options": {"queue": "authentik_scheduled"},
"options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
},
}

View file

@ -1,12 +1,13 @@
"""Enterprise additional settings"""
from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = {
"enterprise_calculate_license": {
"task": "authentik.enterprise.tasks.calculate_license",
"schedule": crontab(minute=fqdn_rand("calculate_license"), hour="*/8"),
"options": {"queue": "authentik_scheduled"},
"options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
}
}

View file

@ -1,12 +1,13 @@
"""Event Settings"""
from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = {
"events_notification_cleanup": {
"task": "authentik.events.tasks.notification_cleanup",
"schedule": crontab(minute=fqdn_rand("notification_cleanup"), hour="*/8"),
"options": {"queue": "authentik_scheduled"},
"options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
},
}

View file

@ -20,6 +20,7 @@ from authentik.events.monitored_tasks import (
TaskResultStatus,
prefill_task,
)
from authentik.lib.config import CONFIG
from authentik.policies.engine import PolicyEngine
from authentik.policies.models import PolicyBinding, PolicyEngineMode
from authentik.root.celery import CELERY_APP
@ -89,7 +90,7 @@ def event_trigger_handler(event_uuid: str, trigger_name: str):
user.pk,
str(trigger.pk),
],
queue="authentik_events",
priority=CONFIG.get_int("worker.priority.events"),
)
if transport.send_once:
break

View file

@ -114,3 +114,8 @@ web:
worker:
concurrency: 2
priority:
default: 4
scheduled: 9
sync: 9
events: 8

View file

@ -1,27 +1,28 @@
"""Outposts Settings"""
from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = {
"outposts_controller": {
"task": "authentik.outposts.tasks.outpost_controller_all",
"schedule": crontab(minute=fqdn_rand("outposts_controller"), hour="*/4"),
"options": {"queue": "authentik_scheduled"},
"options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
},
"outposts_service_connection_check": {
"task": "authentik.outposts.tasks.outpost_service_connection_monitor",
"schedule": crontab(minute="3-59/15"),
"options": {"queue": "authentik_scheduled"},
"options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
},
"outpost_token_ensurer": {
"task": "authentik.outposts.tasks.outpost_token_ensurer",
"schedule": crontab(minute=fqdn_rand("outpost_token_ensurer"), hour="*/8"),
"options": {"queue": "authentik_scheduled"},
"options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
},
"outpost_connection_discovery": {
"task": "authentik.outposts.tasks.outpost_connection_discovery",
"schedule": crontab(minute=fqdn_rand("outpost_connection_discovery"), hour="*/8"),
"options": {"queue": "authentik_scheduled"},
"options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
},
}

View file

@ -1,10 +1,12 @@
"""Reputation Settings"""
from celery.schedules import crontab
from authentik.lib.config import CONFIG
CELERY_BEAT_SCHEDULE = {
"policies_reputation_save": {
"task": "authentik.policies.reputation.tasks.save_reputation",
"schedule": crontab(minute="1-59/5"),
"options": {"queue": "authentik_scheduled"},
"options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
},
}

View file

@ -1,12 +1,13 @@
"""SCIM task Settings"""
from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = {
"providers_scim_sync": {
"task": "authentik.providers.scim.tasks.scim_sync_all",
"schedule": crontab(minute=fqdn_rand("scim_sync_all"), hour="*"),
"options": {"queue": "authentik_scheduled"},
"options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
},
}

View file

@ -339,17 +339,23 @@ CELERY = {
"clean_expired_models": {
"task": "authentik.core.tasks.clean_expired_models",
"schedule": crontab(minute="2-59/5"),
"options": {"queue": "authentik_scheduled"},
"options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
},
"user_cleanup": {
"task": "authentik.core.tasks.clean_temporary_users",
"schedule": crontab(minute="9-59/5"),
"options": {"queue": "authentik_scheduled"},
"options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
},
},
"task_create_missing_queues": True,
"task_default_queue": "authentik",
"task_default_priority": 4,
"task_inherit_parent_priority": True,
"broker_url": f"{_redis_url}/{CONFIG.get('redis.db')}{_redis_celery_tls_requirements}",
"broker_transport_options": {
"queue_order_strategy": "priority",
"priority_steps": list(range(10)),
},
"result_backend": f"{_redis_url}/{CONFIG.get('redis.db')}{_redis_celery_tls_requirements}",
}

View file

@ -1,12 +1,13 @@
"""LDAP Settings"""
from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = {
"sources_ldap_sync": {
"task": "authentik.sources.ldap.tasks.ldap_sync_all",
"schedule": crontab(minute=fqdn_rand("sources_ldap_sync"), hour="*/2"),
"options": {"queue": "authentik_scheduled"},
"options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
}
}

View file

@ -58,10 +58,12 @@ def ldap_sync_single(source_pk: str):
group(
ldap_sync_paginator(source, UserLDAPSynchronizer)
+ ldap_sync_paginator(source, GroupLDAPSynchronizer),
priority=CONFIG.get_int("worker.priority.sync"),
),
# Membership sync needs to run afterwards
group(
ldap_sync_paginator(source, MembershipLDAPSynchronizer),
priority=CONFIG.get_int("worker.priority.sync"),
),
)
task()

View file

@ -1,12 +1,13 @@
"""Plex source settings"""
from celery.schedules import crontab
from authentik.lib.config import CONFIG
from authentik.lib.utils.time import fqdn_rand
CELERY_BEAT_SCHEDULE = {
"check_plex_token": {
"task": "authentik.sources.plex.tasks.check_plex_token_all",
"schedule": crontab(minute=fqdn_rand("check_plex_token"), hour="*/3"),
"options": {"queue": "authentik_scheduled"},
"options": {"priority": CONFIG.get_int("worker.priority.scheduled")},
},
}