core: add token tests for invalid intent and token auth
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
parent
888526a2a7
commit
a2578ffaad
|
@ -1,15 +1,12 @@
|
|||
"""Test Source flow_manager"""
|
||||
from django.contrib.auth.models import AnonymousUser
|
||||
from django.contrib.messages.middleware import MessageMiddleware
|
||||
from django.contrib.sessions.middleware import SessionMiddleware
|
||||
from django.http.request import HttpRequest
|
||||
from django.test import TestCase
|
||||
from django.test.client import RequestFactory
|
||||
from guardian.utils import get_anonymous_user
|
||||
|
||||
from authentik.core.models import SourceUserMatchingModes, User
|
||||
from authentik.core.sources.flow_manager import Action
|
||||
from authentik.flows.tests.test_planner import dummy_get_response
|
||||
from authentik.lib.tests.utils import get_request
|
||||
from authentik.providers.oauth2.generators import generate_client_id
|
||||
from authentik.sources.oauth.models import OAuthSource, UserOAuthSourceConnection
|
||||
from authentik.sources.oauth.views.callback import OAuthSourceFlowManager
|
||||
|
@ -24,22 +21,10 @@ class TestSourceFlowManager(TestCase):
|
|||
self.factory = RequestFactory()
|
||||
self.identifier = generate_client_id()
|
||||
|
||||
def get_request(self, user: User) -> HttpRequest:
|
||||
"""Helper to create a get request with session and message middleware"""
|
||||
request = self.factory.get("/")
|
||||
request.user = user
|
||||
middleware = SessionMiddleware(dummy_get_response)
|
||||
middleware.process_request(request)
|
||||
request.session.save()
|
||||
middleware = MessageMiddleware(dummy_get_response)
|
||||
middleware.process_request(request)
|
||||
request.session.save()
|
||||
return request
|
||||
|
||||
def test_unauthenticated_enroll(self):
|
||||
"""Test un-authenticated user enrolling"""
|
||||
flow_manager = OAuthSourceFlowManager(
|
||||
self.source, self.get_request(AnonymousUser()), self.identifier, {}
|
||||
self.source, get_request("/", user=AnonymousUser()), self.identifier, {}
|
||||
)
|
||||
action, _ = flow_manager.get_action()
|
||||
self.assertEqual(action, Action.ENROLL)
|
||||
|
@ -52,7 +37,7 @@ class TestSourceFlowManager(TestCase):
|
|||
)
|
||||
|
||||
flow_manager = OAuthSourceFlowManager(
|
||||
self.source, self.get_request(AnonymousUser()), self.identifier, {}
|
||||
self.source, get_request("/", user=AnonymousUser()), self.identifier, {}
|
||||
)
|
||||
action, _ = flow_manager.get_action()
|
||||
self.assertEqual(action, Action.AUTH)
|
||||
|
@ -65,7 +50,7 @@ class TestSourceFlowManager(TestCase):
|
|||
)
|
||||
user = User.objects.create(username="foo", email="foo@bar.baz")
|
||||
flow_manager = OAuthSourceFlowManager(
|
||||
self.source, self.get_request(user), self.identifier, {}
|
||||
self.source, get_request("/", user=user), self.identifier, {}
|
||||
)
|
||||
action, _ = flow_manager.get_action()
|
||||
self.assertEqual(action, Action.LINK)
|
||||
|
@ -78,7 +63,7 @@ class TestSourceFlowManager(TestCase):
|
|||
|
||||
# Without email, deny
|
||||
flow_manager = OAuthSourceFlowManager(
|
||||
self.source, self.get_request(AnonymousUser()), self.identifier, {}
|
||||
self.source, get_request("/", user=AnonymousUser()), self.identifier, {}
|
||||
)
|
||||
action, _ = flow_manager.get_action()
|
||||
self.assertEqual(action, Action.DENY)
|
||||
|
@ -86,7 +71,7 @@ class TestSourceFlowManager(TestCase):
|
|||
# With email
|
||||
flow_manager = OAuthSourceFlowManager(
|
||||
self.source,
|
||||
self.get_request(AnonymousUser()),
|
||||
get_request("/", user=AnonymousUser()),
|
||||
self.identifier,
|
||||
{"email": "foo@bar.baz"},
|
||||
)
|
||||
|
@ -101,7 +86,7 @@ class TestSourceFlowManager(TestCase):
|
|||
|
||||
# Without username, deny
|
||||
flow_manager = OAuthSourceFlowManager(
|
||||
self.source, self.get_request(AnonymousUser()), self.identifier, {}
|
||||
self.source, get_request("/", user=AnonymousUser()), self.identifier, {}
|
||||
)
|
||||
action, _ = flow_manager.get_action()
|
||||
self.assertEqual(action, Action.DENY)
|
||||
|
@ -109,7 +94,7 @@ class TestSourceFlowManager(TestCase):
|
|||
# With username
|
||||
flow_manager = OAuthSourceFlowManager(
|
||||
self.source,
|
||||
self.get_request(AnonymousUser()),
|
||||
get_request("/", user=AnonymousUser()),
|
||||
self.identifier,
|
||||
{"username": "foo"},
|
||||
)
|
||||
|
@ -125,7 +110,7 @@ class TestSourceFlowManager(TestCase):
|
|||
# With non-existent username, enroll
|
||||
flow_manager = OAuthSourceFlowManager(
|
||||
self.source,
|
||||
self.get_request(AnonymousUser()),
|
||||
get_request("/", user=AnonymousUser()),
|
||||
self.identifier,
|
||||
{
|
||||
"username": "bar",
|
||||
|
@ -137,7 +122,7 @@ class TestSourceFlowManager(TestCase):
|
|||
# With username
|
||||
flow_manager = OAuthSourceFlowManager(
|
||||
self.source,
|
||||
self.get_request(AnonymousUser()),
|
||||
get_request("/", user=AnonymousUser()),
|
||||
self.identifier,
|
||||
{"username": "foo"},
|
||||
)
|
||||
|
@ -151,7 +136,7 @@ class TestSourceFlowManager(TestCase):
|
|||
|
||||
flow_manager = OAuthSourceFlowManager(
|
||||
self.source,
|
||||
self.get_request(AnonymousUser()),
|
||||
get_request("/", user=AnonymousUser()),
|
||||
self.identifier,
|
||||
{"username": "foo"},
|
||||
)
|
||||
|
|
|
@ -27,6 +27,14 @@ class TestTokenAPI(APITestCase):
|
|||
self.assertEqual(token.intent, TokenIntents.INTENT_API)
|
||||
self.assertEqual(token.expiring, True)
|
||||
|
||||
def test_token_create_invalid(self):
|
||||
"""Test token creation endpoint (invalid data)"""
|
||||
response = self.client.post(
|
||||
reverse("authentik_api:token-list"),
|
||||
{"identifier": "test-token", "intent": TokenIntents.INTENT_RECOVERY},
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_token_create_non_expiring(self):
|
||||
"""Test token creation endpoint"""
|
||||
self.user.attributes[USER_ATTRIBUTE_TOKEN_EXPIRING] = False
|
||||
|
|
40
authentik/core/tests/test_token_auth.py
Normal file
40
authentik/core/tests/test_token_auth.py
Normal file
|
@ -0,0 +1,40 @@
|
|||
"""Test token auth"""
|
||||
from django.test import TestCase
|
||||
|
||||
from authentik.core.auth import TokenBackend
|
||||
from authentik.core.models import Token, TokenIntents, User
|
||||
from authentik.flows.planner import FlowPlan
|
||||
from authentik.flows.views import SESSION_KEY_PLAN
|
||||
from authentik.lib.tests.utils import get_request
|
||||
|
||||
|
||||
class TestTokenAuth(TestCase):
|
||||
"""Test token auth"""
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.user = User.objects.create(username="test-user")
|
||||
self.token = Token.objects.create(
|
||||
expiring=False, user=self.user, intent=TokenIntents.INTENT_APP_PASSWORD
|
||||
)
|
||||
# To test with session we need to create a request and pass it through all middlewares
|
||||
self.request = get_request("/")
|
||||
self.request.session[SESSION_KEY_PLAN] = FlowPlan("test")
|
||||
|
||||
def test_token_auth(self):
|
||||
"""Test auth with token"""
|
||||
self.assertEqual(
|
||||
TokenBackend().authenticate(self.request, "test-user", self.token.key), self.user
|
||||
)
|
||||
|
||||
def test_token_auth_none(self):
|
||||
"""Test auth with token (non-existent user)"""
|
||||
self.assertIsNone(
|
||||
TokenBackend().authenticate(self.request, "test-user-foo", self.token.key), self.user
|
||||
)
|
||||
|
||||
def test_token_auth_invalid(self):
|
||||
"""Test auth with token (invalid token)"""
|
||||
self.assertIsNone(
|
||||
TokenBackend().authenticate(self.request, "test-user", self.token.key + "foo"),
|
||||
self.user,
|
||||
)
|
|
@ -3,7 +3,6 @@ from unittest.mock import MagicMock, Mock, PropertyMock, patch
|
|||
|
||||
from django.contrib.sessions.middleware import SessionMiddleware
|
||||
from django.core.cache import cache
|
||||
from django.http import HttpRequest
|
||||
from django.test import RequestFactory, TestCase
|
||||
from django.urls import reverse
|
||||
from guardian.shortcuts import get_anonymous_user
|
||||
|
@ -13,6 +12,7 @@ from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableExce
|
|||
from authentik.flows.markers import ReevaluateMarker, StageMarker
|
||||
from authentik.flows.models import Flow, FlowDesignation, FlowStageBinding
|
||||
from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner, cache_key
|
||||
from authentik.lib.tests.utils import dummy_get_response
|
||||
from authentik.policies.dummy.models import DummyPolicy
|
||||
from authentik.policies.models import PolicyBinding
|
||||
from authentik.policies.types import PolicyResult
|
||||
|
@ -24,11 +24,6 @@ CACHE_MOCK = Mock(wraps=cache)
|
|||
POLICY_RETURN_TRUE = MagicMock(return_value=PolicyResult(True))
|
||||
|
||||
|
||||
def dummy_get_response(request: HttpRequest): # pragma: no cover
|
||||
"""Dummy get_response for SessionMiddleware"""
|
||||
return None
|
||||
|
||||
|
||||
class TestFlowPlanner(TestCase):
|
||||
"""Test planner logic"""
|
||||
|
||||
|
|
27
authentik/lib/tests/utils.py
Normal file
27
authentik/lib/tests/utils.py
Normal file
|
@ -0,0 +1,27 @@
|
|||
"""Test utils"""
|
||||
from django.contrib.messages.middleware import MessageMiddleware
|
||||
from django.contrib.sessions.middleware import SessionMiddleware
|
||||
from django.http import HttpRequest
|
||||
from django.test.client import RequestFactory
|
||||
from guardian.utils import get_anonymous_user
|
||||
|
||||
|
||||
def dummy_get_response(request: HttpRequest): # pragma: no cover
|
||||
"""Dummy get_response for SessionMiddleware"""
|
||||
return None
|
||||
|
||||
|
||||
def get_request(*args, user=None, **kwargs):
|
||||
"""Get a request with usable session"""
|
||||
request = RequestFactory().get(*args, **kwargs)
|
||||
if user:
|
||||
request.user = user
|
||||
else:
|
||||
request.user = get_anonymous_user()
|
||||
middleware = SessionMiddleware(dummy_get_response)
|
||||
middleware.process_request(request)
|
||||
request.session.save()
|
||||
middleware = MessageMiddleware(dummy_get_response)
|
||||
middleware.process_request(request)
|
||||
request.session.save()
|
||||
return request
|
|
@ -1,16 +1,14 @@
|
|||
"""Test AuthN Request generator and parser"""
|
||||
from base64 import b64encode
|
||||
|
||||
from django.contrib.sessions.middleware import SessionMiddleware
|
||||
from django.http.request import QueryDict
|
||||
from django.test import RequestFactory, TestCase
|
||||
from guardian.utils import get_anonymous_user
|
||||
|
||||
from authentik.core.models import User
|
||||
from authentik.crypto.models import CertificateKeyPair
|
||||
from authentik.events.models import Event, EventAction
|
||||
from authentik.flows.models import Flow
|
||||
from authentik.flows.tests.test_planner import dummy_get_response
|
||||
from authentik.lib.tests.utils import get_request
|
||||
from authentik.managed.manager import ObjectManager
|
||||
from authentik.providers.saml.models import SAMLPropertyMapping, SAMLProvider
|
||||
from authentik.providers.saml.processors.assertion import AssertionProcessor
|
||||
|
@ -99,11 +97,7 @@ class TestAuthNRequest(TestCase):
|
|||
|
||||
def test_signed_valid(self):
|
||||
"""Test generated AuthNRequest with valid signature"""
|
||||
http_request = self.factory.get("/")
|
||||
|
||||
middleware = SessionMiddleware(dummy_get_response)
|
||||
middleware.process_request(http_request)
|
||||
http_request.session.save()
|
||||
http_request = get_request("/")
|
||||
|
||||
# First create an AuthNRequest
|
||||
request_proc = RequestProcessor(self.source, http_request, "test_state")
|
||||
|
@ -117,12 +111,7 @@ class TestAuthNRequest(TestCase):
|
|||
|
||||
def test_request_full_signed(self):
|
||||
"""Test full SAML Request/Response flow, fully signed"""
|
||||
http_request = self.factory.get("/")
|
||||
http_request.user = get_anonymous_user()
|
||||
|
||||
middleware = SessionMiddleware(dummy_get_response)
|
||||
middleware.process_request(http_request)
|
||||
http_request.session.save()
|
||||
http_request = get_request("/")
|
||||
|
||||
# First create an AuthNRequest
|
||||
request_proc = RequestProcessor(self.source, http_request, "test_state")
|
||||
|
@ -145,12 +134,7 @@ class TestAuthNRequest(TestCase):
|
|||
|
||||
def test_request_id_invalid(self):
|
||||
"""Test generated AuthNRequest with invalid request ID"""
|
||||
http_request = self.factory.get("/")
|
||||
http_request.user = get_anonymous_user()
|
||||
|
||||
middleware = SessionMiddleware(dummy_get_response)
|
||||
middleware.process_request(http_request)
|
||||
http_request.session.save()
|
||||
http_request = get_request("/")
|
||||
|
||||
# First create an AuthNRequest
|
||||
request_proc = RequestProcessor(self.source, http_request, "test_state")
|
||||
|
@ -179,11 +163,7 @@ class TestAuthNRequest(TestCase):
|
|||
|
||||
def test_signed_valid_detached(self):
|
||||
"""Test generated AuthNRequest with valid signature (detached)"""
|
||||
http_request = self.factory.get("/")
|
||||
|
||||
middleware = SessionMiddleware(dummy_get_response)
|
||||
middleware.process_request(http_request)
|
||||
http_request.session.save()
|
||||
http_request = get_request("/")
|
||||
|
||||
# First create an AuthNRequest
|
||||
request_proc = RequestProcessor(self.source, http_request, "test_state")
|
||||
|
@ -243,12 +223,7 @@ class TestAuthNRequest(TestCase):
|
|||
|
||||
def test_request_attributes(self):
|
||||
"""Test full SAML Request/Response flow, fully signed"""
|
||||
http_request = self.factory.get("/")
|
||||
http_request.user = User.objects.get(username="akadmin")
|
||||
|
||||
middleware = SessionMiddleware(dummy_get_response)
|
||||
middleware.process_request(http_request)
|
||||
http_request.session.save()
|
||||
http_request = get_request("/", user=User.objects.get(username="akadmin"))
|
||||
|
||||
# First create an AuthNRequest
|
||||
request_proc = RequestProcessor(self.source, http_request, "test_state")
|
||||
|
@ -264,12 +239,7 @@ class TestAuthNRequest(TestCase):
|
|||
|
||||
def test_request_attributes_invalid(self):
|
||||
"""Test full SAML Request/Response flow, fully signed"""
|
||||
http_request = self.factory.get("/")
|
||||
http_request.user = User.objects.get(username="akadmin")
|
||||
|
||||
middleware = SessionMiddleware(dummy_get_response)
|
||||
middleware.process_request(http_request)
|
||||
http_request.session.save()
|
||||
http_request = get_request("/", user=User.objects.get(username="akadmin"))
|
||||
|
||||
# First create an AuthNRequest
|
||||
request_proc = RequestProcessor(self.source, http_request, "test_state")
|
||||
|
|
|
@ -1,18 +1,16 @@
|
|||
"""Test Requests and Responses against schema"""
|
||||
from base64 import b64encode
|
||||
|
||||
from django.contrib.sessions.middleware import SessionMiddleware
|
||||
from django.test import RequestFactory, TestCase
|
||||
from guardian.utils import get_anonymous_user
|
||||
from lxml import etree # nosec
|
||||
|
||||
from authentik.crypto.models import CertificateKeyPair
|
||||
from authentik.flows.models import Flow
|
||||
from authentik.lib.tests.utils import get_request
|
||||
from authentik.managed.manager import ObjectManager
|
||||
from authentik.providers.saml.models import SAMLPropertyMapping, SAMLProvider
|
||||
from authentik.providers.saml.processors.assertion import AssertionProcessor
|
||||
from authentik.providers.saml.processors.request_parser import AuthNRequestParser
|
||||
from authentik.providers.saml.tests.test_auth_n_request import dummy_get_response
|
||||
from authentik.sources.saml.models import SAMLSource
|
||||
from authentik.sources.saml.processors.request import RequestProcessor
|
||||
|
||||
|
@ -43,11 +41,7 @@ class TestSchema(TestCase):
|
|||
|
||||
def test_request_schema(self):
|
||||
"""Test generated AuthNRequest against Schema"""
|
||||
http_request = self.factory.get("/")
|
||||
|
||||
middleware = SessionMiddleware(dummy_get_response)
|
||||
middleware.process_request(http_request)
|
||||
http_request.session.save()
|
||||
http_request = get_request("/")
|
||||
|
||||
# First create an AuthNRequest
|
||||
request_proc = RequestProcessor(self.source, http_request, "test_state")
|
||||
|
@ -60,12 +54,7 @@ class TestSchema(TestCase):
|
|||
|
||||
def test_response_schema(self):
|
||||
"""Test generated AuthNRequest against Schema"""
|
||||
http_request = self.factory.get("/")
|
||||
http_request.user = get_anonymous_user()
|
||||
|
||||
middleware = SessionMiddleware(dummy_get_response)
|
||||
middleware.process_request(http_request)
|
||||
http_request.session.save()
|
||||
http_request = get_request("/")
|
||||
|
||||
# First create an AuthNRequest
|
||||
request_proc = RequestProcessor(self.source, http_request, "test_state")
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
"""Test validator stage"""
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from django.contrib.sessions.middleware import SessionMiddleware
|
||||
from django.test import TestCase
|
||||
from django.test.client import RequestFactory
|
||||
from django.urls.base import reverse
|
||||
|
@ -12,7 +11,7 @@ from rest_framework.exceptions import ValidationError
|
|||
from authentik.core.models import User
|
||||
from authentik.flows.challenge import ChallengeTypes
|
||||
from authentik.flows.models import Flow, FlowStageBinding, NotConfiguredAction
|
||||
from authentik.flows.tests.test_planner import dummy_get_response
|
||||
from authentik.lib.tests.utils import get_request
|
||||
from authentik.providers.oauth2.generators import generate_client_id, generate_client_secret
|
||||
from authentik.stages.authenticator_duo.models import AuthenticatorDuoStage, DuoDevice
|
||||
from authentik.stages.authenticator_validate.api import AuthenticatorValidateStageSerializer
|
||||
|
@ -97,11 +96,8 @@ class AuthenticatorValidateStageTests(TestCase):
|
|||
|
||||
def test_device_challenge_webauthn(self):
|
||||
"""Test webauthn"""
|
||||
request = self.request_factory.get("/")
|
||||
request = get_request("/")
|
||||
request.user = self.user
|
||||
middleware = SessionMiddleware(dummy_get_response)
|
||||
middleware.process_request(request)
|
||||
request.session.save()
|
||||
|
||||
webauthn_device = WebAuthnDevice.objects.create(
|
||||
user=self.user,
|
||||
|
|
Reference in a new issue