a4dc6d13b5
* events: rename audit to events * policies/expression: log expression exceptions as event * policies/expression: add ExpressionPolicy Model to event when possible * lib/expressions: ensure syntax errors are logged too * lib: fix lint error * policies: add execution_logging field * core: add property mapping tests * policies/expression: add full test * policies/expression: fix attribute name * policies: add execution_logging * web: fix imports * root: update swagger * policies: use dataclass instead of dict for types * events: add support for dataclass as event param * events: add special keys which are never cleaned * policies: add tests for process, don't clean full cache * admin: create event when new version is seen * events: move utils to separate file * admin: add tests for admin tasks * events: add .set_user method to ensure users have correct attributes set * core: add test for property_mapping errors with user and request
87 lines
3 KiB
Python
87 lines
3 KiB
Python
"""Events middleware"""
|
|
from functools import partial
|
|
from typing import Callable
|
|
|
|
from django.contrib.auth.models import User
|
|
from django.db.models import Model
|
|
from django.db.models.signals import post_save, pre_delete
|
|
from django.http import HttpRequest, HttpResponse
|
|
|
|
from authentik.core.middleware import LOCAL
|
|
from authentik.events.models import Event, EventAction
|
|
from authentik.events.signals import EventNewThread
|
|
from authentik.events.utils import model_to_dict
|
|
|
|
|
|
class AuditMiddleware:
|
|
"""Register handlers for duration of request-response that log creation/update/deletion
|
|
of models"""
|
|
|
|
get_response: Callable[[HttpRequest], HttpResponse]
|
|
|
|
def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
|
|
self.get_response = get_response
|
|
|
|
def __call__(self, request: HttpRequest) -> HttpResponse:
|
|
# Connect signal for automatic logging
|
|
if hasattr(request, "user") and getattr(
|
|
request.user, "is_authenticated", False
|
|
):
|
|
post_save_handler = partial(
|
|
self.post_save_handler, user=request.user, request=request
|
|
)
|
|
pre_delete_handler = partial(
|
|
self.pre_delete_handler, user=request.user, request=request
|
|
)
|
|
post_save.connect(
|
|
post_save_handler,
|
|
dispatch_uid=LOCAL.authentik["request_id"],
|
|
weak=False,
|
|
)
|
|
pre_delete.connect(
|
|
pre_delete_handler,
|
|
dispatch_uid=LOCAL.authentik["request_id"],
|
|
weak=False,
|
|
)
|
|
|
|
response = self.get_response(request)
|
|
|
|
post_save.disconnect(dispatch_uid=LOCAL.authentik["request_id"])
|
|
pre_delete.disconnect(dispatch_uid=LOCAL.authentik["request_id"])
|
|
|
|
return response
|
|
|
|
# pylint: disable=unused-argument
|
|
def process_exception(self, request: HttpRequest, exception: Exception):
|
|
"""Unregister handlers in case of exception"""
|
|
post_save.disconnect(dispatch_uid=LOCAL.authentik["request_id"])
|
|
pre_delete.disconnect(dispatch_uid=LOCAL.authentik["request_id"])
|
|
|
|
@staticmethod
|
|
# pylint: disable=unused-argument
|
|
def post_save_handler(
|
|
user: User, request: HttpRequest, sender, instance: Model, created: bool, **_
|
|
):
|
|
"""Signal handler for all object's post_save"""
|
|
if isinstance(instance, Event):
|
|
return
|
|
|
|
action = EventAction.MODEL_CREATED if created else EventAction.MODEL_UPDATED
|
|
EventNewThread(action, request, user=user, model=model_to_dict(instance)).run()
|
|
|
|
@staticmethod
|
|
# pylint: disable=unused-argument
|
|
def pre_delete_handler(
|
|
user: User, request: HttpRequest, sender, instance: Model, **_
|
|
):
|
|
"""Signal handler for all object's pre_delete"""
|
|
if isinstance(instance, Event):
|
|
return
|
|
|
|
EventNewThread(
|
|
EventAction.MODEL_DELETED,
|
|
request,
|
|
user=user,
|
|
model=model_to_dict(instance),
|
|
).run()
|