199 lines
6.6 KiB
Python
199 lines
6.6 KiB
Python
"""passbook audit models"""
|
|
from enum import Enum
|
|
from inspect import getmodule, stack
|
|
from typing import Any, Dict, Optional
|
|
from uuid import UUID, uuid4
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import AnonymousUser
|
|
from django.core.exceptions import ValidationError
|
|
from django.db import models
|
|
from django.db.models.base import Model
|
|
from django.http import HttpRequest
|
|
from django.utils.translation import gettext as _
|
|
from django.views.debug import SafeExceptionReporterFilter
|
|
from guardian.utils import get_anonymous_user
|
|
from structlog import get_logger
|
|
|
|
from passbook.core.middleware import (
|
|
SESSION_IMPERSONATE_ORIGINAL_USER,
|
|
SESSION_IMPERSONATE_USER,
|
|
)
|
|
from passbook.core.models import User
|
|
from passbook.lib.utils.http import get_client_ip
|
|
|
|
LOGGER = get_logger()
|
|
|
|
|
|
def cleanse_dict(source: Dict[Any, Any]) -> Dict[Any, Any]:
|
|
"""Cleanse a dictionary, recursively"""
|
|
final_dict = {}
|
|
for key, value in source.items():
|
|
try:
|
|
if SafeExceptionReporterFilter.hidden_settings.search(key):
|
|
final_dict[key] = SafeExceptionReporterFilter.cleansed_substitute
|
|
else:
|
|
final_dict[key] = value
|
|
except TypeError:
|
|
final_dict[key] = value
|
|
if isinstance(value, dict):
|
|
final_dict[key] = cleanse_dict(value)
|
|
return final_dict
|
|
|
|
|
|
def model_to_dict(model: Model) -> Dict[str, Any]:
|
|
"""Convert model to dict"""
|
|
name = str(model)
|
|
if hasattr(model, "name"):
|
|
name = model.name
|
|
return {
|
|
"app": model._meta.app_label,
|
|
"model_name": model._meta.model_name,
|
|
"pk": model.pk,
|
|
"name": name,
|
|
}
|
|
|
|
|
|
def get_user(user: User, original_user: Optional[User] = None) -> Dict[str, Any]:
|
|
"""Convert user object to dictionary, optionally including the original user"""
|
|
if isinstance(user, AnonymousUser):
|
|
user = get_anonymous_user()
|
|
user_data = {
|
|
"username": user.username,
|
|
"pk": user.pk,
|
|
"email": user.email,
|
|
}
|
|
if original_user:
|
|
original_data = get_user(original_user)
|
|
original_data["on_behalf_of"] = user_data
|
|
return original_data
|
|
return user_data
|
|
|
|
|
|
def sanitize_dict(source: Dict[Any, Any]) -> Dict[Any, Any]:
|
|
"""clean source of all Models that would interfere with the JSONField.
|
|
Models are replaced with a dictionary of {
|
|
app: str,
|
|
name: str,
|
|
pk: Any
|
|
}"""
|
|
final_dict = {}
|
|
for key, value in source.items():
|
|
if isinstance(value, dict):
|
|
final_dict[key] = sanitize_dict(value)
|
|
elif isinstance(value, models.Model):
|
|
final_dict[key] = sanitize_dict(model_to_dict(value))
|
|
elif isinstance(value, UUID):
|
|
final_dict[key] = value.hex
|
|
else:
|
|
final_dict[key] = value
|
|
return final_dict
|
|
|
|
|
|
class EventAction(Enum):
|
|
"""All possible actions to save into the audit log"""
|
|
|
|
LOGIN = "login"
|
|
LOGIN_FAILED = "login_failed"
|
|
LOGOUT = "logout"
|
|
AUTHORIZE_APPLICATION = "authorize_application"
|
|
SUSPICIOUS_REQUEST = "suspicious_request"
|
|
SIGN_UP = "sign_up"
|
|
PASSWORD_RESET = "password_reset" # noqa # nosec
|
|
INVITE_CREATED = "invitation_created"
|
|
INVITE_USED = "invitation_used"
|
|
IMPERSONATION_STARTED = "impersonation_started"
|
|
IMPERSONATION_ENDED = "impersonation_ended"
|
|
CUSTOM = "custom"
|
|
|
|
@staticmethod
|
|
def as_choices():
|
|
"""Generate choices of actions used for database"""
|
|
return tuple(
|
|
(x, y.value) for x, y in getattr(EventAction, "__members__").items()
|
|
)
|
|
|
|
|
|
class Event(models.Model):
|
|
"""An individual audit log event"""
|
|
|
|
event_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
|
|
user = models.JSONField(default=dict)
|
|
action = models.TextField(choices=EventAction.as_choices())
|
|
date = models.DateTimeField(auto_now_add=True)
|
|
app = models.TextField()
|
|
context = models.JSONField(default=dict, blank=True)
|
|
client_ip = models.GenericIPAddressField(null=True)
|
|
created = models.DateTimeField(auto_now_add=True)
|
|
|
|
@staticmethod
|
|
def _get_app_from_request(request: HttpRequest) -> str:
|
|
if not isinstance(request, HttpRequest):
|
|
return ""
|
|
return request.resolver_match.app_name
|
|
|
|
@staticmethod
|
|
def new(
|
|
action: EventAction,
|
|
app: Optional[str] = None,
|
|
_inspect_offset: int = 1,
|
|
**kwargs,
|
|
) -> "Event":
|
|
"""Create new Event instance from arguments. Instance is NOT saved."""
|
|
if not isinstance(action, EventAction):
|
|
raise ValueError(
|
|
f"action must be EventAction instance but was {type(action)}"
|
|
)
|
|
if not app:
|
|
app = getmodule(stack()[_inspect_offset][0]).__name__
|
|
cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs))
|
|
event = Event(action=action.value, app=app, context=cleaned_kwargs)
|
|
return event
|
|
|
|
def from_http(
|
|
self, request: HttpRequest, user: Optional[settings.AUTH_USER_MODEL] = None
|
|
) -> "Event":
|
|
"""Add data from a Django-HttpRequest, allowing the creation of
|
|
Events independently from requests.
|
|
`user` arguments optionally overrides user from requests."""
|
|
if hasattr(request, "user"):
|
|
self.user = get_user(
|
|
request.user,
|
|
request.session.get(SESSION_IMPERSONATE_ORIGINAL_USER, None),
|
|
)
|
|
if user:
|
|
self.user = get_user(user)
|
|
# Check if we're currently impersonating, and add that user
|
|
if hasattr(request, "session"):
|
|
if SESSION_IMPERSONATE_ORIGINAL_USER in request.session:
|
|
self.user = get_user(request.session[SESSION_IMPERSONATE_ORIGINAL_USER])
|
|
self.user["on_behalf_of"] = get_user(
|
|
request.session[SESSION_IMPERSONATE_USER]
|
|
)
|
|
# User 255.255.255.255 as fallback if IP cannot be determined
|
|
self.client_ip = get_client_ip(request) or "255.255.255.255"
|
|
# If there's no app set, we get it from the requests too
|
|
if not self.app:
|
|
self.app = Event._get_app_from_request(request)
|
|
self.save()
|
|
return self
|
|
|
|
def save(self, *args, **kwargs):
|
|
if not self._state.adding:
|
|
raise ValidationError(
|
|
"you may not edit an existing %s" % self._meta.model_name
|
|
)
|
|
LOGGER.debug(
|
|
"Created Audit event",
|
|
action=self.action,
|
|
context=self.context,
|
|
client_ip=self.client_ip,
|
|
user=self.user,
|
|
)
|
|
return super().save(*args, **kwargs)
|
|
|
|
class Meta:
|
|
|
|
verbose_name = _("Audit Event")
|
|
verbose_name_plural = _("Audit Events")
|