lifecycle: don't use celery ping for worker healthcheck (#5153)
Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
parent
a92786e153
commit
02f75a92ce
|
@ -2,9 +2,12 @@
|
|||
import os
|
||||
from contextvars import ContextVar
|
||||
from logging.config import dictConfig
|
||||
from pathlib import Path
|
||||
from tempfile import gettempdir
|
||||
from typing import Callable
|
||||
|
||||
from celery import Celery
|
||||
from celery import Celery, bootsteps
|
||||
from celery.apps.worker import Worker
|
||||
from celery.signals import (
|
||||
after_task_publish,
|
||||
setup_logging,
|
||||
|
@ -28,6 +31,7 @@ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "authentik.root.settings")
|
|||
LOGGER = get_logger()
|
||||
CELERY_APP = Celery("authentik")
|
||||
CTX_TASK_ID = ContextVar(STRUCTLOG_KEY_PREFIX + "task_id", default=Ellipsis)
|
||||
HEARTBEAT_FILE = Path(gettempdir() + "/authentik-worker")
|
||||
|
||||
|
||||
@setup_logging.connect
|
||||
|
@ -99,6 +103,33 @@ def worker_ready_hook(*args, **kwargs):
|
|||
start_blueprint_watcher()
|
||||
|
||||
|
||||
class LivenessProbe(bootsteps.StartStopStep):
|
||||
"""Add a timed task to touch a temporary file for healthchecking reasons"""
|
||||
|
||||
requires = {"celery.worker.components:Timer"}
|
||||
|
||||
def __init__(self, parent, **kwargs):
|
||||
super().__init__(parent, **kwargs)
|
||||
self.requests = []
|
||||
self.tref = None
|
||||
|
||||
def start(self, parent: Worker):
|
||||
self.tref = parent.timer.call_repeatedly(
|
||||
10.0,
|
||||
self.update_heartbeat_file,
|
||||
(parent,),
|
||||
priority=10,
|
||||
)
|
||||
self.update_heartbeat_file(parent)
|
||||
|
||||
def stop(self, parent: Worker):
|
||||
HEARTBEAT_FILE.unlink(missing_ok=True)
|
||||
|
||||
def update_heartbeat_file(self, worker: Worker):
|
||||
"""Touch heartbeat file"""
|
||||
HEARTBEAT_FILE.touch()
|
||||
|
||||
|
||||
# Using a string here means the worker doesn't have to serialize
|
||||
# the configuration object to child processes.
|
||||
# - namespace='CELERY' means all celery-related configuration keys
|
||||
|
@ -107,3 +138,4 @@ CELERY_APP.config_from_object(settings, namespace="CELERY")
|
|||
|
||||
# Load task modules from all registered Django app configs.
|
||||
CELERY_APP.autodiscover_tasks()
|
||||
CELERY_APP.steps["worker"].add(LivenessProbe)
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
#!/bin/bash -e
|
||||
MODE_FILE="${TMPDIR}/authentik-mode"
|
||||
WORKER_HEARTBEAT="${TMPDIR}/authentik-worker"
|
||||
|
||||
function log {
|
||||
printf '{"event": "%s", "level": "info", "logger": "bootstrap"}\n' "$@" > /dev/stderr
|
||||
|
@ -80,7 +81,13 @@ elif [[ "$1" == "healthcheck" ]]; then
|
|||
if [[ $mode == "server" ]]; then
|
||||
exec curl --user-agent "goauthentik.io lifecycle Healthcheck" -I http://localhost:9000/-/health/ready/
|
||||
elif [[ $mode == "worker" ]]; then
|
||||
exec celery -A authentik.root.celery inspect ping -d celery@$HOSTNAME --timeout 5 -j
|
||||
mtime=$(stat -f %m $WORKER_HEARTBEAT)
|
||||
time=$(date +"%s")
|
||||
if [ "$(( $time - $mtime ))" -gt "30" ]; then
|
||||
log "Worker hasn't updated heartbeat in 30 seconds"
|
||||
exit 1
|
||||
fi
|
||||
exit 0
|
||||
fi
|
||||
elif [[ "$1" == "dump_config" ]]; then
|
||||
exec python -m authentik.lib.config
|
||||
|
|
Reference in New Issue