diff --git a/authentik/admin/api/tasks.py b/authentik/admin/api/tasks.py index cfce015ee..dd6932cc2 100644 --- a/authentik/admin/api/tasks.py +++ b/authentik/admin/api/tasks.py @@ -22,7 +22,7 @@ class TaskSerializer(PassiveSerializer): task_name = CharField() task_description = CharField() - task_finish_timestamp = DateTimeField(source="finish_timestamp") + task_finish_timestamp = DateTimeField(source="finish_time") status = ChoiceField( source="result.status.name", diff --git a/authentik/admin/api/workers.py b/authentik/admin/api/workers.py index 4433fecd9..ff9b7c5e2 100644 --- a/authentik/admin/api/workers.py +++ b/authentik/admin/api/workers.py @@ -1,5 +1,6 @@ """authentik administration overview""" from drf_spectacular.utils import extend_schema, inline_serializer +from prometheus_client import Gauge from rest_framework.fields import IntegerField from rest_framework.permissions import IsAdminUser from rest_framework.request import Request @@ -8,6 +9,8 @@ from rest_framework.views import APIView from authentik.root.celery import CELERY_APP +GAUGE_WORKERS = Gauge("authentik_admin_workers", "Currently connected workers") + class WorkerView(APIView): """Get currently connected worker count.""" @@ -19,4 +22,5 @@ class WorkerView(APIView): ) def get(self, request: Request) -> Response: """Get currently connected worker count.""" - return Response({"count": len(CELERY_APP.control.ping(timeout=0.5))}) + count = len(CELERY_APP.control.ping(timeout=0.5)) + return Response({"count": count}) diff --git a/authentik/admin/tasks.py b/authentik/admin/tasks.py index dcb52601a..26fe1bbe3 100644 --- a/authentik/admin/tasks.py +++ b/authentik/admin/tasks.py @@ -1,13 +1,15 @@ """authentik admin tasks""" import re +from os import environ from django.core.cache import cache from django.core.validators import URLValidator from packaging.version import parse +from prometheus_client import Info from requests import RequestException, get from structlog.stdlib import get_logger -from authentik import __version__ +from authentik import ENV_GIT_HASH_KEY, __version__ from authentik.events.models import Event, EventAction from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus from authentik.root.celery import CELERY_APP @@ -17,6 +19,18 @@ VERSION_CACHE_KEY = "authentik_latest_version" VERSION_CACHE_TIMEOUT = 8 * 60 * 60 # 8 hours # Chop of the first ^ because we want to search the entire string URL_FINDER = URLValidator.regex.pattern[1:] +PROM_INFO = Info("authentik_version", "Currently running authentik version") + + +def _set_prom_info(): + """Set prometheus info for version""" + PROM_INFO.info( + { + "version": __version__, + "latest": cache.get(VERSION_CACHE_KEY, ""), + "build_hash": environ.get(ENV_GIT_HASH_KEY, ""), + } + ) @CELERY_APP.task(bind=True, base=MonitoredTask) @@ -36,6 +50,7 @@ def update_latest_version(self: MonitoredTask): TaskResultStatus.SUCCESSFUL, ["Successfully updated latest Version"] ) ) + _set_prom_info() # Check if upstream version is newer than what we're running, # and if no event exists yet, create one. local_version = parse(__version__) @@ -53,3 +68,6 @@ def update_latest_version(self: MonitoredTask): except (RequestException, IndexError) as exc: cache.set(VERSION_CACHE_KEY, "0.0.0", VERSION_CACHE_TIMEOUT) self.set_status(TaskResult(TaskResultStatus.ERROR).with_error(exc)) + + +_set_prom_info() diff --git a/authentik/core/apps.py b/authentik/core/apps.py index fe5660cca..ef222669f 100644 --- a/authentik/core/apps.py +++ b/authentik/core/apps.py @@ -2,6 +2,10 @@ from importlib import import_module from django.apps import AppConfig +from django.db import ProgrammingError + +from authentik.core.signals import GAUGE_MODELS +from authentik.lib.utils.reflection import get_apps class AuthentikCoreConfig(AppConfig): @@ -15,3 +19,12 @@ class AuthentikCoreConfig(AppConfig): def ready(self): import_module("authentik.core.signals") import_module("authentik.core.managed") + try: + for app in get_apps(): + for model in app.get_models(): + GAUGE_MODELS.labels( + model_name=model._meta.model_name, + app=model._meta.app_label, + ).set(model.objects.count()) + except ProgrammingError: + pass diff --git a/authentik/core/signals.py b/authentik/core/signals.py index ec7675fe2..089ed7bb1 100644 --- a/authentik/core/signals.py +++ b/authentik/core/signals.py @@ -1,20 +1,31 @@ """authentik core signals""" from django.core.cache import cache from django.core.signals import Signal +from django.db.models import Model from django.db.models.signals import post_save from django.dispatch import receiver +from prometheus_client import Gauge # Arguments: user: User, password: str password_changed = Signal() +GAUGE_MODELS = Gauge( + "authentik_models", "Count of various objects", ["model_name", "app"] +) + @receiver(post_save) # pylint: disable=unused-argument -def post_save_application(sender, instance, created: bool, **_): +def post_save_application(sender: type[Model], instance, created: bool, **_): """Clear user's application cache upon application creation""" from authentik.core.api.applications import user_app_cache_key from authentik.core.models import Application + GAUGE_MODELS.labels( + model_name=sender._meta.model_name, + app=sender._meta.app_label, + ).set(sender.objects.count()) + if sender != Application: return if not created: # pragma: no cover diff --git a/authentik/events/apps.py b/authentik/events/apps.py index ad9e7d205..f0eb77c9c 100644 --- a/authentik/events/apps.py +++ b/authentik/events/apps.py @@ -1,7 +1,10 @@ """authentik events app""" +from datetime import timedelta from importlib import import_module from django.apps import AppConfig +from django.db import ProgrammingError +from django.utils.timezone import datetime class AuthentikEventsConfig(AppConfig): @@ -13,3 +16,12 @@ class AuthentikEventsConfig(AppConfig): def ready(self): import_module("authentik.events.signals") + try: + from authentik.events.models import Event + + date_from = datetime.now() - timedelta(days=1) + + for event in Event.objects.filter(created__gte=date_from): + event._set_prom_metrics() + except ProgrammingError: + pass diff --git a/authentik/events/models.py b/authentik/events/models.py index cc41586af..55bb64d98 100644 --- a/authentik/events/models.py +++ b/authentik/events/models.py @@ -11,6 +11,7 @@ from django.http import HttpRequest from django.utils.timezone import now from django.utils.translation import gettext as _ from geoip2.errors import GeoIP2Error +from prometheus_client import Gauge from requests import RequestException, post from structlog.stdlib import get_logger @@ -28,6 +29,11 @@ from authentik.policies.models import PolicyBindingModel from authentik.stages.email.utils import TemplateEmailMessage LOGGER = get_logger("authentik.events") +GAUGE_EVENTS = Gauge( + "authentik_events", + "Events in authentik", + ["action", "user_username", "app", "client_ip"], +) def default_event_duration(): @@ -169,6 +175,14 @@ class Event(ExpiringModel): except GeoIP2Error as exc: LOGGER.warning("Failed to add geoIP Data to event", exc=exc) + def _set_prom_metrics(self): + GAUGE_EVENTS.labels( + action=self.action, + user_username=self.user.get("username"), + app=self.app, + client_ip=self.client_ip, + ).set(self.created.timestamp()) + def save(self, *args, **kwargs): if self._state.adding: LOGGER.debug( @@ -178,7 +192,8 @@ class Event(ExpiringModel): client_ip=self.client_ip, user=self.user, ) - return super().save(*args, **kwargs) + super().save(*args, **kwargs) + self._set_prom_metrics() @property def summary(self) -> str: diff --git a/authentik/events/monitored_tasks.py b/authentik/events/monitored_tasks.py index 456eb2468..d3a269aed 100644 --- a/authentik/events/monitored_tasks.py +++ b/authentik/events/monitored_tasks.py @@ -2,14 +2,22 @@ from dataclasses import dataclass, field from datetime import datetime from enum import Enum +from timeit import default_timer from traceback import format_tb from typing import Any, Optional from celery import Task from django.core.cache import cache +from prometheus_client import Gauge from authentik.events.models import Event, EventAction +GAUGE_TASKS = Gauge( + "authentik_system_tasks", + "System tasks and their status", + ["task_name", "task_uid", "status"], +) + class TaskResultStatus(Enum): """Possible states of tasks""" @@ -43,7 +51,9 @@ class TaskInfo: """Info about a task run""" task_name: str - finish_timestamp: datetime + start_timestamp: float + finish_timestamp: float + finish_time: datetime result: TaskResult @@ -73,12 +83,25 @@ class TaskInfo: """Delete task info from cache""" return cache.delete(f"task_{self.task_name}") + def set_prom_metrics(self): + """Update prometheus metrics""" + start = default_timer() + if hasattr(self, "start_timestamp"): + start = self.start_timestamp + duration = max(self.finish_timestamp - start, 0) + GAUGE_TASKS.labels( + task_name=self.task_name, + task_uid=self.result.uid or "", + status=self.result.status, + ).set(duration) + def save(self, timeout_hours=6): """Save task into cache""" key = f"task_{self.task_name}" if self.result.uid: key += f"_{self.result.uid}" self.task_name += f"_{self.result.uid}" + self.set_prom_metrics() cache.set(key, self, timeout=timeout_hours * 60 * 60) @@ -98,6 +121,7 @@ class MonitoredTask(Task): self._uid = None self._result = TaskResult(status=TaskResultStatus.ERROR, messages=[]) self.result_timeout_hours = 6 + self.start = default_timer() def set_uid(self, uid: str): """Set UID, so in the case of an unexpected error its saved correctly""" @@ -117,7 +141,9 @@ class MonitoredTask(Task): TaskInfo( task_name=self.__name__, task_description=self.__doc__, - finish_timestamp=datetime.now(), + start_timestamp=self.start, + finish_timestamp=default_timer(), + finish_time=datetime.now(), result=self._result, task_call_module=self.__module__, task_call_func=self.__name__, @@ -133,7 +159,9 @@ class MonitoredTask(Task): TaskInfo( task_name=self.__name__, task_description=self.__doc__, - finish_timestamp=datetime.now(), + start_timestamp=self.start, + finish_timestamp=default_timer(), + finish_time=datetime.now(), result=self._result, task_call_module=self.__module__, task_call_func=self.__name__, @@ -151,3 +179,7 @@ class MonitoredTask(Task): def run(self, *args, **kwargs): raise NotImplementedError + + +for task in TaskInfo.all().values(): + task.set_prom_metrics() diff --git a/authentik/flows/planner.py b/authentik/flows/planner.py index f66f02dc6..718f64cfa 100644 --- a/authentik/flows/planner.py +++ b/authentik/flows/planner.py @@ -4,6 +4,7 @@ from typing import Any, Optional from django.core.cache import cache from django.http import HttpRequest +from prometheus_client import Histogram from sentry_sdk.hub import Hub from sentry_sdk.tracing import Span from structlog.stdlib import BoundLogger, get_logger @@ -14,6 +15,7 @@ from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableExce from authentik.flows.markers import ReevaluateMarker, StageMarker from authentik.flows.models import Flow, FlowStageBinding, Stage from authentik.policies.engine import PolicyEngine +from authentik.root.monitoring import UpdatingGauge LOGGER = get_logger() PLAN_CONTEXT_PENDING_USER = "pending_user" @@ -21,6 +23,16 @@ PLAN_CONTEXT_SSO = "is_sso" PLAN_CONTEXT_REDIRECT = "redirect" PLAN_CONTEXT_APPLICATION = "application" PLAN_CONTEXT_SOURCE = "source" +GAUGE_FLOWS_CACHED = UpdatingGauge( + "authentik_flows_cached", + "Cached flows", + update_func=lambda: len(cache.keys("flow_*")), +) +HIST_FLOWS_PLAN_TIME = Histogram( + "authentik_flows_plan_time", + "Duration to build a plan for a flow", + ["flow_slug"], +) def cache_key(flow: Flow, user: Optional[User] = None) -> str: @@ -146,6 +158,7 @@ class FlowPlanner: ) plan = self._build_plan(user, request, default_context) cache.set(cache_key(self.flow, user), plan) + GAUGE_FLOWS_CACHED.update() if not plan.stages and not self.allow_empty_flows: raise EmptyFlowException() return plan @@ -158,7 +171,9 @@ class FlowPlanner: ) -> FlowPlan: """Build flow plan by checking each stage in their respective order and checking the applied policies""" - with Hub.current.start_span(op="flow.planner.build_plan") as span: + with Hub.current.start_span( + op="flow.planner.build_plan" + ) as span, HIST_FLOWS_PLAN_TIME.labels(flow_slug=self.flow.slug).time(): span: Span span.set_data("flow", self.flow) span.set_data("user", user) @@ -202,6 +217,7 @@ class FlowPlanner: marker = ReevaluateMarker(binding=binding, user=user) if stage: plan.append(stage, marker) + HIST_FLOWS_PLAN_TIME.labels(flow_slug=self.flow.slug) self._logger.debug( "f(plan): finished building", ) diff --git a/authentik/outposts/channels.py b/authentik/outposts/channels.py index d0dad47b8..3e46dceed 100644 --- a/authentik/outposts/channels.py +++ b/authentik/outposts/channels.py @@ -8,11 +8,21 @@ from channels.exceptions import DenyConnection from dacite import from_dict from dacite.data import Data from guardian.shortcuts import get_objects_for_user +from prometheus_client import Gauge from structlog.stdlib import get_logger from authentik.core.channels import AuthJsonConsumer from authentik.outposts.models import OUTPOST_HELLO_INTERVAL, Outpost, OutpostState +GAUGE_OUTPOSTS_CONNECTED = Gauge( + "authentik_outposts_connected", "Currently connected outposts", ["outpost", "uid"] +) +GAUGE_OUTPOSTS_LAST_UPDATE = Gauge( + "authentik_outposts_last_update", + "Last update from any outpost", + ["outpost", "uid", "version"], +) + LOGGER = get_logger() @@ -44,6 +54,8 @@ class OutpostConsumer(AuthJsonConsumer): last_uid: Optional[str] = None + first_msg = False + def connect(self): super().connect() uuid = self.scope["url_route"]["kwargs"]["pk"] @@ -68,6 +80,10 @@ class OutpostConsumer(AuthJsonConsumer): if self.channel_name in state.channel_ids: state.channel_ids.remove(self.channel_name) state.save() + GAUGE_OUTPOSTS_CONNECTED.labels( + outpost=self.outpost.name, + uid=self.last_uid, + ).dec() LOGGER.debug( "removed outpost instance from cache", outpost=self.outpost, @@ -78,15 +94,29 @@ class OutpostConsumer(AuthJsonConsumer): msg = from_dict(WebsocketMessage, content) uid = msg.args.get("uuid", self.channel_name) self.last_uid = uid + state = OutpostState.for_instance_uid(self.outpost, uid) if self.channel_name not in state.channel_ids: state.channel_ids.append(self.channel_name) state.last_seen = datetime.now() + + if not self.first_msg: + GAUGE_OUTPOSTS_CONNECTED.labels( + outpost=self.outpost.name, + uid=self.last_uid, + ).inc() + self.first_msg = True + if msg.instruction == WebsocketMessageInstruction.HELLO: state.version = msg.args.get("version", None) state.build_hash = msg.args.get("buildHash", "") elif msg.instruction == WebsocketMessageInstruction.ACK: return + GAUGE_OUTPOSTS_LAST_UPDATE.labels( + outpost=self.outpost.name, + uid=self.last_uid or "", + version=state.version or "", + ).set_to_current_time() state.save(timeout=OUTPOST_HELLO_INTERVAL * 1.5) response = WebsocketMessage(instruction=WebsocketMessageInstruction.ACK) diff --git a/authentik/policies/engine.py b/authentik/policies/engine.py index b8ac67fa2..99f27b1d5 100644 --- a/authentik/policies/engine.py +++ b/authentik/policies/engine.py @@ -5,6 +5,7 @@ from typing import Iterator, Optional from django.core.cache import cache from django.http import HttpRequest +from prometheus_client import Histogram from sentry_sdk.hub import Hub from sentry_sdk.tracing import Span from structlog.stdlib import BoundLogger, get_logger @@ -18,8 +19,19 @@ from authentik.policies.models import ( ) from authentik.policies.process import PolicyProcess, cache_key from authentik.policies.types import PolicyRequest, PolicyResult +from authentik.root.monitoring import UpdatingGauge CURRENT_PROCESS = current_process() +GAUGE_POLICIES_CACHED = UpdatingGauge( + "authentik_policies_cached", + "Cached Policies", + update_func=lambda: len(cache.keys("policy_*")), +) +HIST_POLICIES_BUILD_TIME = Histogram( + "authentik_policies_build_time", + "Execution times complete policy result to an object", + ["object_name", "object_type", "user"], +) class PolicyProcessInfo: @@ -92,7 +104,13 @@ class PolicyEngine: def build(self) -> "PolicyEngine": """Build wrapper which monitors performance""" - with Hub.current.start_span(op="policy.engine.build") as span: + with Hub.current.start_span( + op="policy.engine.build" + ) as span, HIST_POLICIES_BUILD_TIME.labels( + object_name=self.__pbm, + object_type=f"{self.__pbm._meta.app_label}.{self.__pbm._meta.model_name}", + user=self.request.user, + ).time(): span: Span span.set_data("pbm", self.__pbm) span.set_data("request", self.request) diff --git a/authentik/policies/models.py b/authentik/policies/models.py index 084a0ca9f..380e6eaa6 100644 --- a/authentik/policies/models.py +++ b/authentik/policies/models.py @@ -111,14 +111,30 @@ class PolicyBinding(SerializerModel): return PolicyBindingSerializer - def __str__(self) -> str: - suffix = "" + @property + def target_type(self) -> str: + """Get the target type this binding is applied to""" if self.policy: - suffix = f"Policy {self.policy.name}" + return "policy" if self.group: - suffix = f"Group {self.group.name}" + return "group" if self.user: - suffix = f"User {self.user.name}" + return "user" + return "invalid" + + @property + def target_name(self) -> str: + """Get the target name this binding is applied to""" + if self.policy: + return self.policy.name + if self.group: + return self.group.name + if self.user: + return self.user.name + return "invalid" + + def __str__(self) -> str: + suffix = f"{self.target_type.title()} {self.target_name}" try: return f"Binding from {self.target} #{self.order} to {suffix}" except PolicyBinding.target.RelatedObjectDoesNotExist: # pylint: disable=no-member diff --git a/authentik/policies/process.py b/authentik/policies/process.py index 263881ca5..cdf859c7f 100644 --- a/authentik/policies/process.py +++ b/authentik/policies/process.py @@ -5,6 +5,7 @@ from traceback import format_tb from typing import Optional from django.core.cache import cache +from prometheus_client import Histogram from sentry_sdk.hub import Hub from sentry_sdk.tracing import Span from structlog.stdlib import get_logger @@ -19,6 +20,18 @@ TRACEBACK_HEADER = "Traceback (most recent call last):\n" FORK_CTX = get_context("fork") PROCESS_CLASS = FORK_CTX.Process +HIST_POLICIES_EXECUTION_TIME = Histogram( + "authentik_policies_execution_time", + "Execution times for single policies", + [ + "binding_order", + "binding_target_type", + "binding_target_name", + "object_name", + "object_type", + "user", + ], +) def cache_key(binding: PolicyBinding, request: PolicyRequest) -> str: @@ -121,7 +134,14 @@ class PolicyProcess(PROCESS_CLASS): """Task wrapper to run policy checking""" with Hub.current.start_span( op="policy.process.execute", - ) as span: + ) as span, HIST_POLICIES_EXECUTION_TIME.labels( + binding_order=self.binding.order, + binding_target_type=self.binding.target_type, + binding_target_name=self.binding.target_name, + object_name=self.request.obj, + object_type=f"{self.request.obj._meta.app_label}.{self.request.obj._meta.model_name}", + user=str(self.request.user), + ).time(): span: Span span.set_data("policy", self.binding.policy) span.set_data("request", self.request) diff --git a/authentik/root/monitoring.py b/authentik/root/monitoring.py index 755ff9842..c7b97ac94 100644 --- a/authentik/root/monitoring.py +++ b/authentik/root/monitoring.py @@ -1,5 +1,6 @@ """Metrics view""" from base64 import b64encode +from typing import Callable from django.conf import settings from django.db import connections @@ -8,8 +9,30 @@ from django.http import HttpRequest, HttpResponse from django.views import View from django_prometheus.exports import ExportToDjangoView from django_redis import get_redis_connection +from prometheus_client import Gauge from redis.exceptions import RedisError +from authentik.admin.api.workers import GAUGE_WORKERS +from authentik.events.monitored_tasks import TaskInfo +from authentik.root.celery import CELERY_APP + + +class UpdatingGauge(Gauge): + """Gauge which fetches its own value from an update function. + + Update function is called on instantiate""" + + def __init__(self, *args, update_func: Callable, **kwargs): + super().__init__(*args, **kwargs) + self._update_func = update_func + self.update() + + def update(self): + """Set value from update function""" + val = self._update_func() + if val: + self.set(val) + class MetricsView(View): """Wrapper around ExportToDjangoView, using http-basic auth""" @@ -20,12 +43,18 @@ class MetricsView(View): auth_type, _, given_credentials = auth_header.partition(" ") credentials = f"monitor:{settings.SECRET_KEY}" expected = b64encode(str.encode(credentials)).decode() - - if auth_type != "Basic" or given_credentials != expected: + authed = auth_type == "Basic" and given_credentials == expected + if not authed and not settings.DEBUG: response = HttpResponse(status=401) response["WWW-Authenticate"] = 'Basic realm="authentik-monitoring"' return response + count = len(CELERY_APP.control.ping(timeout=0.5)) + GAUGE_WORKERS.set(count) + + for task in TaskInfo.all().values(): + task.set_prom_metrics() + return ExportToDjangoView(request) diff --git a/authentik/root/settings.py b/authentik/root/settings.py index 039e6bef6..3b1e92be2 100644 --- a/authentik/root/settings.py +++ b/authentik/root/settings.py @@ -256,7 +256,7 @@ CHANNEL_LAYERS = { DATABASES = { "default": { - "ENGINE": "django.db.backends.postgresql", + "ENGINE": "django_prometheus.db.backends.postgresql", "HOST": CONFIG.y("postgresql.host"), "NAME": CONFIG.y("postgresql.name"), "USER": CONFIG.y("postgresql.user"), @@ -334,6 +334,10 @@ CELERY_RESULT_BACKEND = ( DBBACKUP_STORAGE = "django.core.files.storage.FileSystemStorage" DBBACKUP_STORAGE_OPTIONS = {"location": "./backups" if DEBUG else "/backups"} DBBACKUP_FILENAME_TEMPLATE = "authentik-backup-{datetime}.sql" +DBBACKUP_CONNECTOR_MAPPING = { + "django_prometheus.db.backends.postgresql": "dbbackup.db.postgresql.PgDumpConnector", +} + if CONFIG.y("postgresql.s3_backup"): DBBACKUP_STORAGE = "storages.backends.s3boto3.S3Boto3Storage" DBBACKUP_STORAGE_OPTIONS = { diff --git a/schema.yml b/schema.yml index a1446dc34..3b0837b7a 100644 --- a/schema.yml +++ b/schema.yml @@ -1,7 +1,7 @@ openapi: 3.0.3 info: title: authentik - version: 2021.5.3 + version: 2021.5.4 description: Making authentication simple. contact: email: hello@beryju.org