*: refactor prometheus gauges to directly updating metrics view
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
parent
300ad88447
commit
111fbf119b
|
@ -1,4 +1,6 @@
|
|||
"""authentik admin app config"""
|
||||
from importlib import import_module
|
||||
|
||||
from django.apps import AppConfig
|
||||
|
||||
|
||||
|
@ -13,3 +15,4 @@ class AuthentikAdminConfig(AppConfig):
|
|||
from authentik.admin.tasks import clear_update_notifications
|
||||
|
||||
clear_update_notifications.delay()
|
||||
import_module("authentik.admin.signals")
|
||||
|
|
23
authentik/admin/signals.py
Normal file
23
authentik/admin/signals.py
Normal file
|
@ -0,0 +1,23 @@
|
|||
"""admin signals"""
|
||||
from django.dispatch import receiver
|
||||
|
||||
from authentik.admin.api.tasks import TaskInfo
|
||||
from authentik.admin.api.workers import GAUGE_WORKERS
|
||||
from authentik.root.celery import CELERY_APP
|
||||
from authentik.root.monitoring import monitoring_set
|
||||
|
||||
|
||||
@receiver(monitoring_set)
|
||||
# pylint: disable=unused-argument
|
||||
def monitoring_set_workers(sender, **kwargs):
|
||||
"""Set worker gauge"""
|
||||
count = len(CELERY_APP.control.ping(timeout=0.5))
|
||||
GAUGE_WORKERS.set(count)
|
||||
|
||||
|
||||
@receiver(monitoring_set)
|
||||
# pylint: disable=unused-argument
|
||||
def monitoring_set_tasks(sender, **kwargs):
|
||||
"""Set task gauges"""
|
||||
for task in TaskInfo.all().values():
|
||||
task.set_prom_metrics()
|
|
@ -1,6 +1,7 @@
|
|||
"""authentik core signals"""
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from django.apps import apps
|
||||
from django.contrib.auth.signals import user_logged_in, user_logged_out
|
||||
from django.contrib.sessions.backends.cache import KEY_PREFIX
|
||||
from django.core.cache import cache
|
||||
|
@ -11,6 +12,8 @@ from django.dispatch import receiver
|
|||
from django.http.request import HttpRequest
|
||||
from prometheus_client import Gauge
|
||||
|
||||
from authentik.root.monitoring import monitoring_set
|
||||
|
||||
# Arguments: user: User, password: str
|
||||
password_changed = Signal()
|
||||
|
||||
|
@ -20,6 +23,17 @@ if TYPE_CHECKING:
|
|||
from authentik.core.models import AuthenticatedSession, User
|
||||
|
||||
|
||||
@receiver(monitoring_set)
|
||||
# pylint: disable=unused-argument
|
||||
def monitoring_set_models(sender, **kwargs):
|
||||
"""set models gauges"""
|
||||
for model in apps.get_models():
|
||||
GAUGE_MODELS.labels(
|
||||
model_name=model._meta.model_name,
|
||||
app=model._meta.app_label,
|
||||
).set(model.objects.count())
|
||||
|
||||
|
||||
@receiver(post_save)
|
||||
# pylint: disable=unused-argument
|
||||
def post_save_application(sender: type[Model], instance, created: bool, **_):
|
||||
|
@ -27,11 +41,6 @@ def post_save_application(sender: type[Model], instance, created: bool, **_):
|
|||
from authentik.core.api.applications import user_app_cache_key
|
||||
from authentik.core.models import Application
|
||||
|
||||
GAUGE_MODELS.labels(
|
||||
model_name=sender._meta.model_name,
|
||||
app=sender._meta.app_label,
|
||||
).set(sender.objects.count())
|
||||
|
||||
if sender != Application:
|
||||
return
|
||||
if not created: # pragma: no cover
|
||||
|
|
|
@ -4,7 +4,7 @@ from typing import Any, Optional
|
|||
|
||||
from django.core.cache import cache
|
||||
from django.http import HttpRequest
|
||||
from prometheus_client import Histogram
|
||||
from prometheus_client import Gauge, Histogram
|
||||
from sentry_sdk.hub import Hub
|
||||
from sentry_sdk.tracing import Span
|
||||
from structlog.stdlib import BoundLogger, get_logger
|
||||
|
@ -16,7 +16,6 @@ from authentik.flows.markers import ReevaluateMarker, StageMarker
|
|||
from authentik.flows.models import Flow, FlowStageBinding, Stage
|
||||
from authentik.lib.config import CONFIG
|
||||
from authentik.policies.engine import PolicyEngine
|
||||
from authentik.root.monitoring import UpdatingGauge
|
||||
|
||||
LOGGER = get_logger()
|
||||
PLAN_CONTEXT_PENDING_USER = "pending_user"
|
||||
|
@ -27,10 +26,9 @@ PLAN_CONTEXT_SOURCE = "source"
|
|||
# Is set by the Flow Planner when a FlowToken was used, and the currently active flow plan
|
||||
# was restored.
|
||||
PLAN_CONTEXT_IS_RESTORED = "is_restored"
|
||||
GAUGE_FLOWS_CACHED = UpdatingGauge(
|
||||
GAUGE_FLOWS_CACHED = Gauge(
|
||||
"authentik_flows_cached",
|
||||
"Cached flows",
|
||||
update_func=lambda: len(cache.keys("flow_*") or []),
|
||||
)
|
||||
HIST_FLOWS_PLAN_TIME = Histogram(
|
||||
"authentik_flows_plan_time",
|
||||
|
@ -171,7 +169,6 @@ class FlowPlanner:
|
|||
)
|
||||
plan = self._build_plan(user, request, default_context)
|
||||
cache.set(cache_key(self.flow, user), plan, CACHE_TIMEOUT)
|
||||
GAUGE_FLOWS_CACHED.update()
|
||||
if not plan.bindings and not self.allow_empty_flows:
|
||||
raise EmptyFlowException()
|
||||
return plan
|
||||
|
|
|
@ -4,6 +4,9 @@ from django.db.models.signals import post_save, pre_delete
|
|||
from django.dispatch import receiver
|
||||
from structlog.stdlib import get_logger
|
||||
|
||||
from authentik.flows.planner import GAUGE_FLOWS_CACHED
|
||||
from authentik.root.monitoring import monitoring_set
|
||||
|
||||
LOGGER = get_logger()
|
||||
|
||||
|
||||
|
@ -14,6 +17,13 @@ def delete_cache_prefix(prefix: str) -> int:
|
|||
return len(keys)
|
||||
|
||||
|
||||
@receiver(monitoring_set)
|
||||
# pylint: disable=unused-argument
|
||||
def monitoring_set_flows(sender, **kwargs):
|
||||
"""set flow gauges"""
|
||||
GAUGE_FLOWS_CACHED.set(len(cache.keys("flow_*") or []))
|
||||
|
||||
|
||||
@receiver(post_save)
|
||||
@receiver(pre_delete)
|
||||
# pylint: disable=unused-argument
|
||||
|
|
|
@ -5,7 +5,7 @@ from typing import Iterator, Optional
|
|||
|
||||
from django.core.cache import cache
|
||||
from django.http import HttpRequest
|
||||
from prometheus_client import Histogram
|
||||
from prometheus_client import Gauge, Histogram
|
||||
from sentry_sdk.hub import Hub
|
||||
from sentry_sdk.tracing import Span
|
||||
from structlog.stdlib import BoundLogger, get_logger
|
||||
|
@ -14,13 +14,11 @@ from authentik.core.models import User
|
|||
from authentik.policies.models import Policy, PolicyBinding, PolicyBindingModel, PolicyEngineMode
|
||||
from authentik.policies.process import PolicyProcess, cache_key
|
||||
from authentik.policies.types import PolicyRequest, PolicyResult
|
||||
from authentik.root.monitoring import UpdatingGauge
|
||||
|
||||
CURRENT_PROCESS = current_process()
|
||||
GAUGE_POLICIES_CACHED = UpdatingGauge(
|
||||
GAUGE_POLICIES_CACHED = Gauge(
|
||||
"authentik_policies_cached",
|
||||
"Cached Policies",
|
||||
update_func=lambda: len(cache.keys("policy_*") or []),
|
||||
)
|
||||
HIST_POLICIES_BUILD_TIME = Histogram(
|
||||
"authentik_policies_build_time",
|
||||
|
|
|
@ -5,10 +5,19 @@ from django.dispatch import receiver
|
|||
from structlog.stdlib import get_logger
|
||||
|
||||
from authentik.core.api.applications import user_app_cache_key
|
||||
from authentik.policies.engine import GAUGE_POLICIES_CACHED
|
||||
from authentik.root.monitoring import monitoring_set
|
||||
|
||||
LOGGER = get_logger()
|
||||
|
||||
|
||||
@receiver(monitoring_set)
|
||||
# pylint: disable=unused-argument
|
||||
def monitoring_set_policies(sender, **kwargs):
|
||||
"""set policy gauges"""
|
||||
GAUGE_POLICIES_CACHED.set(len(cache.keys("policy_*") or []))
|
||||
|
||||
|
||||
@receiver(post_save)
|
||||
# pylint: disable=unused-argument
|
||||
def invalidate_policy_cache(sender, instance, **_):
|
||||
|
|
|
@ -1,37 +1,17 @@
|
|||
"""Metrics view"""
|
||||
from base64 import b64encode
|
||||
from typing import Callable
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import connections
|
||||
from django.db.utils import OperationalError
|
||||
from django.dispatch import Signal
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from django.views import View
|
||||
from django_prometheus.exports import ExportToDjangoView
|
||||
from django_redis import get_redis_connection
|
||||
from prometheus_client import Gauge
|
||||
from redis.exceptions import RedisError
|
||||
|
||||
from authentik.admin.api.workers import GAUGE_WORKERS
|
||||
from authentik.events.monitored_tasks import TaskInfo
|
||||
from authentik.root.celery import CELERY_APP
|
||||
|
||||
|
||||
class UpdatingGauge(Gauge):
|
||||
"""Gauge which fetches its own value from an update function.
|
||||
|
||||
Update function is called on instantiate"""
|
||||
|
||||
def __init__(self, *args, update_func: Callable, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._update_func = update_func
|
||||
self.update()
|
||||
|
||||
def update(self):
|
||||
"""Set value from update function"""
|
||||
val = self._update_func()
|
||||
if val:
|
||||
self.set(val)
|
||||
monitoring_set = Signal()
|
||||
|
||||
|
||||
class MetricsView(View):
|
||||
|
@ -49,11 +29,7 @@ class MetricsView(View):
|
|||
response["WWW-Authenticate"] = 'Basic realm="authentik-monitoring"'
|
||||
return response
|
||||
|
||||
count = len(CELERY_APP.control.ping(timeout=0.5))
|
||||
GAUGE_WORKERS.set(count)
|
||||
|
||||
for task in TaskInfo.all().values():
|
||||
task.set_prom_metrics()
|
||||
monitoring_set.send_robust(self)
|
||||
|
||||
return ExportToDjangoView(request)
|
||||
|
||||
|
|
|
@ -60,3 +60,10 @@ else:
|
|||
|
||||
workers = int(os.environ.get("WORKERS", default_workers))
|
||||
threads = int(os.environ.get("THREADS", 4))
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def worker_exit(server, worker):
|
||||
"""Remove pid dbs when worker is shutdown"""
|
||||
from prometheus_client import multiprocess
|
||||
|
||||
multiprocess.mark_process_dead(worker.pid)
|
||||
|
|
Reference in a new issue