From e9d9d658c4a5d515cd10449fdce18b264e3d46f2 Mon Sep 17 00:00:00 2001 From: Jens L Date: Fri, 15 Jul 2022 19:44:45 +0200 Subject: [PATCH] lifecycle: make worker wait for migrations to be done (#3254) * lifecycle: make worker wait for migrations to be done * retry managed reconcile task --- authentik/managed/tasks.py | 7 ++++++- lifecycle/ak | 5 +++++ lifecycle/migrate.py | 10 ++++++++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/authentik/managed/tasks.py b/authentik/managed/tasks.py index 2cc9b21d2..ef7683506 100644 --- a/authentik/managed/tasks.py +++ b/authentik/managed/tasks.py @@ -11,7 +11,11 @@ from authentik.events.monitored_tasks import ( from authentik.managed.manager import ObjectManager -@CELERY_APP.task(bind=True, base=MonitoredTask) +@CELERY_APP.task( + bind=True, + base=MonitoredTask, + retry_backoff=True, +) @prefill_task def managed_reconcile(self: MonitoredTask): """Run ObjectManager to ensure objects are up-to-date""" @@ -22,3 +26,4 @@ def managed_reconcile(self: MonitoredTask): ) except DatabaseError as exc: # pragma: no cover self.set_status(TaskResult(TaskResultStatus.WARNING, [str(exc)])) + self.retry() diff --git a/lifecycle/ak b/lifecycle/ak index c304952ff..2f1c83858 100755 --- a/lifecycle/ak +++ b/lifecycle/ak @@ -44,6 +44,11 @@ if [[ "$1" == "server" ]]; then /authentik-proxy elif [[ "$1" == "worker" ]]; then wait_for_db + # Check if the migration lock is set, and exit if so + # the orchestrator should restart this container, and this prevents + # errors when startup tasks are attempted to be run without + # migrations in place + python -m lifecycle.migrate check_lock echo "worker" > $MODE_FILE check_if_root "celery -A authentik.root.celery worker -Ofair --max-tasks-per-child=1 --autoscale 3,1 -E -B -s /tmp/celerybeat-schedule -Q authentik,authentik_scheduled,authentik_events" elif [[ "$1" == "bash" ]]; then diff --git a/lifecycle/migrate.py b/lifecycle/migrate.py index 504d398ab..898faa0ea 100755 --- a/lifecycle/migrate.py +++ b/lifecycle/migrate.py @@ -1,6 +1,7 @@ #!/usr/bin/env python """System Migration handler""" import os +import sys from importlib.util import module_from_spec, spec_from_file_location from inspect import getmembers, isclass from pathlib import Path @@ -50,7 +51,16 @@ def release_lock(): curr.execute("SELECT pg_advisory_unlock(%s)", (ADV_LOCK_UID,)) +def is_locked(): + """Check if lock is currently active (used by worker to wait for migrations)""" + curr.executor("SELECT count(*) FROM pg_locks WHERE objid = %s", (ADV_LOCK_UID,)) + return curr.rowcount + + if __name__ == "__main__": + if len(sys.argv) > 1: + if sys.argv[1] == "check_lock": + sys.exit(is_locked()) conn = connect( dbname=CONFIG.y("postgresql.name"),