"""passbook administration overview""" import time from collections import Counter from datetime import timedelta from typing import Dict, List from django.db.models import Count, ExpressionWrapper, F from django.db.models.fields import DurationField from django.db.models.functions import ExtractHour from django.http import response from django.utils.timezone import now from drf_yasg.utils import swagger_auto_schema from rest_framework.fields import SerializerMethodField from rest_framework.permissions import IsAdminUser from rest_framework.request import Request from rest_framework.response import Response from rest_framework.serializers import Serializer from rest_framework.viewsets import ViewSet from passbook.audit.models import Event, EventAction class AdministrationMetricsSerializer(Serializer): """Overview View""" logins_per_1h = SerializerMethodField() logins_failed_per_1h = SerializerMethodField() def get_events_per_1h(self, action: str) -> List[Dict[str, int]]: """Get event count by hour in the last day, fill with zeros""" date_from = now() - timedelta(days=1) result = ( Event.objects.filter(action=action, created__gte=date_from) .annotate( age=ExpressionWrapper( now() - F("created"), output_field=DurationField() ) ) .annotate(age_hours=ExtractHour("age")) .values("age_hours") .annotate(count=Count("pk")) .order_by("age_hours") ) data = Counter({d["age_hours"]: d["count"] for d in result}) results = [] _now = now() for hour in range(0, -24, -1): results.append( { "x": time.mktime((_now + timedelta(hours=hour)).timetuple()) * 1000, "y": data[hour * -1], } ) return results def get_logins_per_1h(self, _): """Get successful logins per hour for the last 24 hours""" return self.get_events_per_1h(EventAction.LOGIN) def get_logins_failed_per_1h(self, _): """Get failed logins per hour for the last 24 hours""" return self.get_events_per_1h(EventAction.LOGIN_FAILED) def create(self, request: Request) -> response: raise NotImplementedError def update(self, request: Request) -> Response: raise NotImplementedError class AdministrationMetricsViewSet(ViewSet): """Return single instance of AdministrationMetricsSerializer""" permission_classes = [IsAdminUser] @swagger_auto_schema(responses={200: AdministrationMetricsSerializer(many=True)}) def list(self, request: Request) -> Response: """Return single instance of AdministrationMetricsSerializer""" serializer = AdministrationMetricsSerializer(True) return Response(serializer.data)