This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/authentik/events/utils.py
Jens L 1ccf6dcf6f
events: Notifications (#418)
* events: initial alerting implementation

* policies: move error handling to process, ensure policy UUID is saved

* policies: add tests for error handling in PolicyProcess

* events: improve loop detection

* events: add API for action and trigger

* policies: ensure http_request is not used in context

* events: adjust unittests for user handling

* policies/event_matcher: add policy type

* events: add API tests

* events: add middleware tests

* core: make application's provider not required

* outposts: allow blank kubeconfig

* outposts: validate kubeconfig before saving

* api: fix formatting

* stages/invitation: remove invitation_created signal as model_created functions the same

* stages/invitation: ensure created_by is set when creating from API

* events: rebase migrations on master

* events: fix missing Alerts from API

* policies: fix unittests

* events: add tests for alerts

* events: rename from alerting to notifications

* events: add ability to specify severity of notification created

* policies/event_matcher: Add app field to match on event app

* policies/event_matcher: fix EventMatcher not being included in API

* core: use objects.none() when get_queryset is used

* events: use m2m for multiple transports, create notification object in task

* events: add default triggers

* events: fix migrations return value

* events: fix notification_transport not being in the correct queue

* stages/email: allow sending of email without backend

* events: implement sending via webhook + slack/discord + email
2021-01-11 18:43:59 +01:00

99 lines
3.4 KiB
Python

"""event utilities"""
import re
from dataclasses import asdict, is_dataclass
from typing import Any, Dict, Optional
from uuid import UUID
from django.contrib.auth.models import AnonymousUser
from django.core.handlers.wsgi import WSGIRequest
from django.db import models
from django.db.models.base import Model
from django.http.request import HttpRequest
from django.views.debug import SafeExceptionReporterFilter
from guardian.utils import get_anonymous_user
from authentik.core.models import User
from authentik.policies.types import PolicyRequest
# Special keys which are *not* cleaned, even when the default filter
# is matched
ALLOWED_SPECIAL_KEYS = re.compile("passing", flags=re.I)
def cleanse_dict(source: Dict[Any, Any]) -> Dict[Any, Any]:
"""Cleanse a dictionary, recursively"""
final_dict = {}
for key, value in source.items():
try:
if SafeExceptionReporterFilter.hidden_settings.search(
key
) and not ALLOWED_SPECIAL_KEYS.search(key):
final_dict[key] = SafeExceptionReporterFilter.cleansed_substitute
else:
final_dict[key] = value
except TypeError: # pragma: no cover
final_dict[key] = value
if isinstance(value, dict):
final_dict[key] = cleanse_dict(value)
return final_dict
def model_to_dict(model: Model) -> Dict[str, Any]:
"""Convert model to dict"""
name = str(model)
if hasattr(model, "name"):
name = model.name
return {
"app": model._meta.app_label,
"model_name": model._meta.model_name,
"pk": model.pk,
"name": name,
}
def get_user(user: User, original_user: Optional[User] = None) -> Dict[str, Any]:
"""Convert user object to dictionary, optionally including the original user"""
if isinstance(user, AnonymousUser):
user = get_anonymous_user()
user_data = {
"username": user.username,
"pk": user.pk,
"email": user.email,
}
if original_user:
original_data = get_user(original_user)
original_data["on_behalf_of"] = user_data
return original_data
return user_data
def sanitize_dict(source: Dict[Any, Any]) -> Dict[Any, Any]:
"""clean source of all Models that would interfere with the JSONField.
Models are replaced with a dictionary of {
app: str,
name: str,
pk: Any
}"""
final_dict = {}
for key, value in source.items():
if is_dataclass(value):
# Because asdict calls `copy.deepcopy(obj)` on everything thats not tuple/dict,
# and deepcopy doesn't work with HttpRequests (neither django nor rest_framework).
# Currently, the only dataclass that actually holds an http request is a PolicyRequest
if isinstance(value, PolicyRequest):
value.http_request = None
value = asdict(value)
if isinstance(value, dict):
final_dict[key] = sanitize_dict(value)
elif isinstance(value, User):
final_dict[key] = sanitize_dict(get_user(value))
elif isinstance(value, models.Model):
final_dict[key] = sanitize_dict(model_to_dict(value))
elif isinstance(value, UUID):
final_dict[key] = value.hex
elif isinstance(value, (HttpRequest, WSGIRequest)):
continue
else:
final_dict[key] = value
return final_dict