"""authentik events models""" from inspect import getmodule, stack from smtplib import SMTPException from typing import Optional, Union from uuid import uuid4 from django.conf import settings from django.db import models from django.http import HttpRequest from django.utils.translation import gettext as _ from geoip2.errors import GeoIP2Error from requests import RequestException, post from structlog.stdlib import get_logger from authentik import __version__ from authentik.core.middleware import ( SESSION_IMPERSONATE_ORIGINAL_USER, SESSION_IMPERSONATE_USER, ) from authentik.core.models import Group, User from authentik.events.geo import GEOIP_READER from authentik.events.utils import cleanse_dict, get_user, sanitize_dict from authentik.lib.sentry import SentryIgnoredException from authentik.lib.utils.http import get_client_ip from authentik.policies.models import PolicyBindingModel from authentik.stages.email.utils import TemplateEmailMessage LOGGER = get_logger("authentik.events") class NotificationTransportError(SentryIgnoredException): """Error raised when a notification fails to be delivered""" class EventAction(models.TextChoices): """All possible actions to save into the events log""" LOGIN = "login" LOGIN_FAILED = "login_failed" LOGOUT = "logout" USER_WRITE = "user_write" SUSPICIOUS_REQUEST = "suspicious_request" PASSWORD_SET = "password_set" # noqa # nosec SECRET_VIEW = "secret_view" # noqa # nosec INVITE_USED = "invitation_used" AUTHORIZE_APPLICATION = "authorize_application" SOURCE_LINKED = "source_linked" IMPERSONATION_STARTED = "impersonation_started" IMPERSONATION_ENDED = "impersonation_ended" POLICY_EXECUTION = "policy_execution" POLICY_EXCEPTION = "policy_exception" PROPERTY_MAPPING_EXCEPTION = "property_mapping_exception" SYSTEM_TASK_EXECUTION = "system_task_execution" SYSTEM_TASK_EXCEPTION = "system_task_exception" CONFIGURATION_ERROR = "configuration_error" MODEL_CREATED = "model_created" MODEL_UPDATED = "model_updated" MODEL_DELETED = "model_deleted" UPDATE_AVAILABLE = "update_available" CUSTOM_PREFIX = "custom_" class Event(models.Model): """An individual Audit/Metrics/Notification/Error Event""" event_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4) user = models.JSONField(default=dict) action = models.TextField(choices=EventAction.choices) app = models.TextField() context = models.JSONField(default=dict, blank=True) client_ip = models.GenericIPAddressField(null=True) created = models.DateTimeField(auto_now_add=True) @staticmethod def _get_app_from_request(request: HttpRequest) -> str: if not isinstance(request, HttpRequest): return "" return request.resolver_match.app_name @staticmethod def new( action: Union[str, EventAction], app: Optional[str] = None, _inspect_offset: int = 1, **kwargs, ) -> "Event": """Create new Event instance from arguments. Instance is NOT saved.""" if not isinstance(action, EventAction): action = EventAction.CUSTOM_PREFIX + action if not app: app = getmodule(stack()[_inspect_offset][0]).__name__ cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs)) event = Event(action=action, app=app, context=cleaned_kwargs) return event def set_user(self, user: User) -> "Event": """Set `.user` based on user, ensuring the correct attributes are copied. This should only be used when self.from_http is *not* used.""" self.user = get_user(user) return self def from_http( self, request: HttpRequest, user: Optional[settings.AUTH_USER_MODEL] = None ) -> "Event": """Add data from a Django-HttpRequest, allowing the creation of Events independently from requests. `user` arguments optionally overrides user from requests.""" if hasattr(request, "user"): original_user = None if hasattr(request, "session"): original_user = request.session.get( SESSION_IMPERSONATE_ORIGINAL_USER, None ) self.user = get_user(request.user, original_user) if user: self.user = get_user(user) # Check if we're currently impersonating, and add that user if hasattr(request, "session"): if SESSION_IMPERSONATE_ORIGINAL_USER in request.session: self.user = get_user(request.session[SESSION_IMPERSONATE_ORIGINAL_USER]) self.user["on_behalf_of"] = get_user( request.session[SESSION_IMPERSONATE_USER] ) # User 255.255.255.255 as fallback if IP cannot be determined self.client_ip = get_client_ip(request) or "255.255.255.255" # Apply GeoIP Data, when enabled self.with_geoip() # If there's no app set, we get it from the requests too if not self.app: self.app = Event._get_app_from_request(request) self.save() return self def with_geoip(self): """Apply GeoIP Data, when enabled""" if not GEOIP_READER: return try: response = GEOIP_READER.city(self.client_ip) self.context["geo"] = { "continent": response.continent.code, "country": response.country.iso_code, "lat": response.location.latitude, "long": response.location.longitude, } if response.city.name: self.context["geo"]["city"] = response.city.name except GeoIP2Error as exc: LOGGER.warning("Failed to add geoIP Data to event", exc=exc) def save(self, *args, **kwargs): if self._state.adding: LOGGER.debug( "Created Event", action=self.action, context=self.context, client_ip=self.client_ip, user=self.user, ) return super().save(*args, **kwargs) @property def summary(self) -> str: """Return a summary of this event.""" if "message" in self.context: return self.context["message"] return f"{self.action}: {self.context}" def __str__(self) -> str: return f"" class Meta: verbose_name = _("Event") verbose_name_plural = _("Events") class TransportMode(models.TextChoices): """Modes that a notification transport can send a notification""" WEBHOOK = "webhook", _("Generic Webhook") WEBHOOK_SLACK = "webhook_slack", _("Slack Webhook (Slack/Discord)") EMAIL = "email", _("Email") class NotificationTransport(models.Model): """Action which is executed when a Rule matches""" uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4) name = models.TextField(unique=True) mode = models.TextField(choices=TransportMode.choices) webhook_url = models.TextField(blank=True) send_once = models.BooleanField( default=False, help_text=_( "Only send notification once, for example when sending a webhook into a chat channel." ), ) def send(self, notification: "Notification") -> list[str]: """Send notification to user, called from async task""" if self.mode == TransportMode.WEBHOOK: return self.send_webhook(notification) if self.mode == TransportMode.WEBHOOK_SLACK: return self.send_webhook_slack(notification) if self.mode == TransportMode.EMAIL: return self.send_email(notification) raise ValueError(f"Invalid mode {self.mode} set") def send_webhook(self, notification: "Notification") -> list[str]: """Send notification to generic webhook""" try: response = post( self.webhook_url, json={ "body": notification.body, "severity": notification.severity, "user_email": notification.user.email, "user_username": notification.user.username, }, ) response.raise_for_status() except RequestException as exc: raise NotificationTransportError(exc.response.text) from exc return [ response.status_code, response.text, ] def send_webhook_slack(self, notification: "Notification") -> list[str]: """Send notification to slack or slack-compatible endpoints""" fields = [ { "title": _("Severity"), "value": notification.severity, "short": True, }, { "title": _("Dispatched for user"), "value": str(notification.user), "short": True, }, ] if notification.event: for key, value in notification.event.context.items(): if not isinstance(value, str): continue # https://birdie0.github.io/discord-webhooks-guide/other/field_limits.html if len(fields) >= 25: continue fields.append({"title": key[:256], "value": value[:1024]}) body = { "username": "authentik", "icon_url": "https://goauthentik.io/img/icon.png", "attachments": [ { "author_name": "authentik", "author_link": "https://goauthentik.io", "author_icon": "https://goauthentik.io/img/icon.png", "title": notification.body, "color": "#fd4b2d", "fields": fields, "footer": f"authentik v{__version__}", } ], } if notification.event: body["attachments"][0]["title"] = notification.event.action try: response = post(self.webhook_url, json=body) response.raise_for_status() except RequestException as exc: raise NotificationTransportError(exc.response.text) from exc return [ response.status_code, response.text, ] def send_email(self, notification: "Notification") -> list[str]: """Send notification via global email configuration""" subject = "authentik Notification: " key_value = {} if notification.event: subject += notification.event.action for key, value in notification.event.context.items(): if not isinstance(value, str): continue key_value[key] = value else: subject += notification.body[:75] mail = TemplateEmailMessage( subject=subject, template_name="email/generic.html", to=[notification.user.email], template_context={ "title": subject, "body": notification.body, "key_value": key_value, }, ) # Email is sent directly here, as the call to send() should have been from a task. try: from authentik.stages.email.tasks import send_mail # pyright: reportGeneralTypeIssues=false return send_mail(mail.__dict__) # pylint: disable=no-value-for-parameter except (SMTPException, ConnectionError, OSError) as exc: raise NotificationTransportError from exc def __str__(self) -> str: return f"Notification Transport {self.name}" class Meta: verbose_name = _("Notification Transport") verbose_name_plural = _("Notification Transports") class NotificationSeverity(models.TextChoices): """Severity images that a notification can have""" NOTICE = "notice", _("Notice") WARNING = "warning", _("Warning") ALERT = "alert", _("Alert") class Notification(models.Model): """Event Notification""" uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4) severity = models.TextField(choices=NotificationSeverity.choices) body = models.TextField() created = models.DateTimeField(auto_now_add=True) event = models.ForeignKey(Event, on_delete=models.SET_NULL, null=True, blank=True) seen = models.BooleanField(default=False) user = models.ForeignKey(User, on_delete=models.CASCADE) def __str__(self) -> str: body_trunc = (self.body[:75] + "..") if len(self.body) > 75 else self.body return f"Notification for user {self.user}: {body_trunc}" class Meta: verbose_name = _("Notification") verbose_name_plural = _("Notifications") class NotificationRule(PolicyBindingModel): """Decide when to create a Notification based on policies attached to this object.""" name = models.TextField(unique=True) transports = models.ManyToManyField( NotificationTransport, help_text=_( ( "Select which transports should be used to notify the user. If none are " "selected, the notification will only be shown in the authentik UI." ) ), ) severity = models.TextField( choices=NotificationSeverity.choices, default=NotificationSeverity.NOTICE, help_text=_( "Controls which severity level the created notifications will have." ), ) group = models.ForeignKey( Group, help_text=_( ( "Define which group of users this notification should be sent and shown to. " "If left empty, Notification won't ben sent." ) ), null=True, blank=True, on_delete=models.SET_NULL, ) def __str__(self) -> str: return f"Notification Rule {self.name}" class Meta: verbose_name = _("Notification Rule") verbose_name_plural = _("Notification Rules")