diff --git a/authentik/root/celery.py b/authentik/root/celery.py index bb72433a2..136e004fa 100644 --- a/authentik/root/celery.py +++ b/authentik/root/celery.py @@ -2,9 +2,12 @@ import os from contextvars import ContextVar from logging.config import dictConfig +from pathlib import Path +from tempfile import gettempdir from typing import Callable -from celery import Celery +from celery import Celery, bootsteps +from celery.apps.worker import Worker from celery.signals import ( after_task_publish, setup_logging, @@ -28,6 +31,7 @@ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "authentik.root.settings") LOGGER = get_logger() CELERY_APP = Celery("authentik") CTX_TASK_ID = ContextVar(STRUCTLOG_KEY_PREFIX + "task_id", default=Ellipsis) +HEARTBEAT_FILE = Path(gettempdir() + "/authentik-worker") @setup_logging.connect @@ -99,6 +103,33 @@ def worker_ready_hook(*args, **kwargs): start_blueprint_watcher() +class LivenessProbe(bootsteps.StartStopStep): + """Add a timed task to touch a temporary file for healthchecking reasons""" + + requires = {"celery.worker.components:Timer"} + + def __init__(self, parent, **kwargs): + super().__init__(parent, **kwargs) + self.requests = [] + self.tref = None + + def start(self, parent: Worker): + self.tref = parent.timer.call_repeatedly( + 10.0, + self.update_heartbeat_file, + (parent,), + priority=10, + ) + self.update_heartbeat_file(parent) + + def stop(self, parent: Worker): + HEARTBEAT_FILE.unlink(missing_ok=True) + + def update_heartbeat_file(self, worker: Worker): + """Touch heartbeat file""" + HEARTBEAT_FILE.touch() + + # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys @@ -107,3 +138,4 @@ CELERY_APP.config_from_object(settings, namespace="CELERY") # Load task modules from all registered Django app configs. CELERY_APP.autodiscover_tasks() +CELERY_APP.steps["worker"].add(LivenessProbe) diff --git a/lifecycle/ak b/lifecycle/ak index fa183bc6a..10f72b421 100755 --- a/lifecycle/ak +++ b/lifecycle/ak @@ -1,5 +1,6 @@ #!/bin/bash -e MODE_FILE="${TMPDIR}/authentik-mode" +WORKER_HEARTBEAT="${TMPDIR}/authentik-worker" function log { printf '{"event": "%s", "level": "info", "logger": "bootstrap"}\n' "$@" > /dev/stderr @@ -80,7 +81,13 @@ elif [[ "$1" == "healthcheck" ]]; then if [[ $mode == "server" ]]; then exec curl --user-agent "goauthentik.io lifecycle Healthcheck" -I http://localhost:9000/-/health/ready/ elif [[ $mode == "worker" ]]; then - exec celery -A authentik.root.celery inspect ping -d celery@$HOSTNAME --timeout 5 -j + mtime=$(stat -f %m $WORKER_HEARTBEAT) + time=$(date +"%s") + if [ "$(( $time - $mtime ))" -gt "30" ]; then + log "Worker hasn't updated heartbeat in 30 seconds" + exit 1 + fi + exit 0 fi elif [[ "$1" == "dump_config" ]]; then exec python -m authentik.lib.config