* events: rename audit to events * policies/expression: log expression exceptions as event * policies/expression: add ExpressionPolicy Model to event when possible * lib/expressions: ensure syntax errors are logged too * lib: fix lint error * policies: add execution_logging field * core: add property mapping tests * policies/expression: add full test * policies/expression: fix attribute name * policies: add execution_logging * web: fix imports * root: update swagger * policies: use dataclass instead of dict for types * events: add support for dataclass as event param * events: add special keys which are never cleaned * policies: add tests for process, don't clean full cache * admin: create event when new version is seen * events: move utils to separate file * admin: add tests for admin tasks * events: add .set_user method to ensure users have correct attributes set * core: add test for property_mapping errors with user and request
26 lines
840 B
Python
26 lines
840 B
Python
"""authentik administration overview"""
|
|
from rest_framework.mixins import ListModelMixin
|
|
from rest_framework.permissions import IsAdminUser
|
|
from rest_framework.request import Request
|
|
from rest_framework.response import Response
|
|
from rest_framework.serializers import Serializer
|
|
from rest_framework.viewsets import GenericViewSet
|
|
|
|
from authentik.root.celery import CELERY_APP
|
|
|
|
|
|
class WorkerViewSet(ListModelMixin, GenericViewSet):
|
|
"""Get currently connected worker count."""
|
|
|
|
serializer_class = Serializer
|
|
permission_classes = [IsAdminUser]
|
|
|
|
def get_queryset(self): # pragma: no cover
|
|
return None
|
|
|
|
def list(self, request: Request) -> Response:
|
|
"""Get currently connected worker count."""
|
|
return Response(
|
|
{"pagination": {"count": len(CELERY_APP.control.ping(timeout=0.5))}}
|
|
)
|