policies: fix test API not working, add tests

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
Jens Langhammer 2021-03-30 17:12:50 +02:00
parent 41914d9b7a
commit 6916c59483
2 changed files with 33 additions and 7 deletions

View file

@ -1,6 +1,5 @@
"""policy API Views""" """policy API Views"""
from django.core.cache import cache from django.core.cache import cache
from django.http.response import HttpResponseBadRequest
from django.urls import reverse from django.urls import reverse
from drf_yasg.utils import no_body, swagger_auto_schema from drf_yasg.utils import no_body, swagger_auto_schema
from guardian.shortcuts import get_objects_for_user from guardian.shortcuts import get_objects_for_user
@ -127,8 +126,6 @@ class PolicyViewSet(
@action(detail=False, methods=["POST"]) @action(detail=False, methods=["POST"])
def cache_clear(self, request: Request) -> Response: def cache_clear(self, request: Request) -> Response:
"""Clear policy cache""" """Clear policy cache"""
if not request.user.is_superuser:
return HttpResponseBadRequest()
keys = cache.keys("policy_*") keys = cache.keys("policy_*")
cache.delete_many(keys) cache.delete_many(keys)
LOGGER.debug("Cleared Policy cache", keys=len(keys)) LOGGER.debug("Cleared Policy cache", keys=len(keys))
@ -143,16 +140,17 @@ class PolicyViewSet(
responses={200: PolicyTestResultSerializer()}, responses={200: PolicyTestResultSerializer()},
) )
@action(detail=True, methods=["POST"]) @action(detail=True, methods=["POST"])
def test(self, request: Request) -> Response: # pylint: disable=unused-argument, invalid-name
def test(self, request: Request, pk: str) -> Response:
"""Test policy""" """Test policy"""
policy = self.get_object() policy = self.get_object()
test_params = PolicyTestSerializer(request.data) test_params = PolicyTestSerializer(data=request.data)
if not test_params.is_valid(): if not test_params.is_valid():
return Response(test_params.errors, status=400) return Response(test_params.errors, status=400)
# User permission check, only allow policy testing for users that are readable # User permission check, only allow policy testing for users that are readable
users = get_objects_for_user(request.user, "authentik_core.view_user").filter( users = get_objects_for_user(request.user, "authentik_core.view_user").filter(
pk=test_params["user"] pk=test_params.validated_data["user"].pk
) )
if not users.exists(): if not users.exists():
raise PermissionDenied() raise PermissionDenied()
@ -165,4 +163,4 @@ class PolicyViewSet(
proc = PolicyProcess(PolicyBinding(policy=policy), p_request, None) proc = PolicyProcess(PolicyBinding(policy=policy), p_request, None)
result = proc.execute() result = proc.execute()
response = PolicyTestResultSerializer(result) response = PolicyTestResultSerializer(result)
return Response(response) return Response(response.data)

View file

@ -0,0 +1,28 @@
"""Test policies API"""
from django.urls import reverse
from rest_framework.test import APITestCase
from authentik.core.models import User
from authentik.policies.dummy.models import DummyPolicy
class TestPoliciesAPI(APITestCase):
"""Test policies API"""
def setUp(self) -> None:
super().setUp()
self.policy = DummyPolicy.objects.create(name="dummy", result=True)
self.user = User.objects.get(username="akadmin")
self.client.force_login(self.user)
def test_test_call(self):
"""Test Policy's test endpoint"""
response = self.client.post(
reverse("authentik_api:policy-test", kwargs={"pk": self.policy.pk}),
data={
"user": self.user.pk,
},
)
self.assertJSONEqual(
response.content.decode(), {"passing": True, "messages": ["dummy"]}
)