stages/authenticator_validate: fix error with passwordless webauthn login, improve tests

closes #4527

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Jens Langhammer 2023-01-25 14:44:30 +01:00
parent 0abbe8288e
commit 68058fb2ae
No known key found for this signature in database
2 changed files with 128 additions and 41 deletions

View File

@ -376,7 +376,7 @@ class AuthenticatorValidateStageView(ChallengeStageView):
def challenge_valid(self, response: AuthenticatorValidationChallengeResponse) -> HttpResponse: def challenge_valid(self, response: AuthenticatorValidationChallengeResponse) -> HttpResponse:
# All validation is done by the serializer # All validation is done by the serializer
user = self.get_pending_user() user = self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER)
if not user: if not user:
if "webauthn" not in response.data: if "webauthn" not in response.data:
return self.executor.stage_invalid() return self.executor.stage_invalid()

View File

@ -9,9 +9,10 @@ from webauthn.helpers.bytes_to_base64url import bytes_to_base64url
from authentik.core.tests.utils import create_test_admin_user, create_test_flow from authentik.core.tests.utils import create_test_admin_user, create_test_flow
from authentik.flows.models import FlowStageBinding, NotConfiguredAction from authentik.flows.models import FlowStageBinding, NotConfiguredAction
from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
from authentik.flows.stage import StageView from authentik.flows.stage import StageView
from authentik.flows.tests import FlowTestCase from authentik.flows.tests import FlowTestCase
from authentik.flows.views.executor import FlowExecutorView from authentik.flows.views.executor import SESSION_KEY_PLAN, FlowExecutorView
from authentik.lib.generators import generate_id from authentik.lib.generators import generate_id
from authentik.lib.tests.utils import get_request from authentik.lib.tests.utils import get_request
from authentik.stages.authenticator_validate.challenge import ( from authentik.stages.authenticator_validate.challenge import (
@ -20,10 +21,14 @@ from authentik.stages.authenticator_validate.challenge import (
validate_challenge_webauthn, validate_challenge_webauthn,
) )
from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses
from authentik.stages.authenticator_validate.stage import AuthenticatorValidateStageView from authentik.stages.authenticator_validate.stage import (
SESSION_KEY_DEVICE_CHALLENGES,
AuthenticatorValidateStageView,
)
from authentik.stages.authenticator_webauthn.models import UserVerification, WebAuthnDevice from authentik.stages.authenticator_webauthn.models import UserVerification, WebAuthnDevice
from authentik.stages.authenticator_webauthn.stage import SESSION_KEY_WEBAUTHN_CHALLENGE from authentik.stages.authenticator_webauthn.stage import SESSION_KEY_WEBAUTHN_CHALLENGE
from authentik.stages.identification.models import IdentificationStage, UserFields from authentik.stages.identification.models import IdentificationStage, UserFields
from authentik.stages.user_login.models import UserLoginStage
class AuthenticatorValidateStageWebAuthnTests(FlowTestCase): class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
@ -185,10 +190,7 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
def test_validate_challenge(self): def test_validate_challenge(self):
"""Test webauthn""" """Test webauthn"""
request = get_request("/") device = WebAuthnDevice.objects.create(
request.user = self.user
WebAuthnDevice.objects.create(
user=self.user, user=self.user,
public_key=( public_key=(
"pQECAyYgASFYIGsBLkklToCQkT7qJT_bJYN1sEc1oJdbnmoOc43i0J" "pQECAyYgASFYIGsBLkklToCQkT7qJT_bJYN1sEc1oJdbnmoOc43i0J"
@ -204,37 +206,43 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
not_configured_action=NotConfiguredAction.CONFIGURE, not_configured_action=NotConfiguredAction.CONFIGURE,
device_classes=[DeviceClasses.WEBAUTHN], device_classes=[DeviceClasses.WEBAUTHN],
) )
stage_view = AuthenticatorValidateStageView( session = self.client.session
FlowExecutorView(flow=flow, current_stage=stage), request=request plan = FlowPlan(flow_pk=flow.pk.hex)
) plan.append_stage(stage)
request = get_request("/") plan.append_stage(UserLoginStage(name=generate_id()))
request.session[SESSION_KEY_WEBAUTHN_CHALLENGE] = base64url_to_bytes( plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
session[SESSION_KEY_PLAN] = plan
session[SESSION_KEY_DEVICE_CHALLENGES] = [
{
"device_class": device.__class__.__name__.lower().replace("device", ""),
"device_uid": device.pk,
"challenge": {},
}
]
session[SESSION_KEY_WEBAUTHN_CHALLENGE] = base64url_to_bytes(
( (
"g98I51mQvZXo5lxLfhrD2zfolhZbLRyCgqkkYap1" "g98I51mQvZXo5lxLfhrD2zfolhZbLRyCgqkkYap1"
"jwSaJ13BguoJWCF9_Lg3AgO4Wh-Bqa556JE20oKsYbl6RA" "jwSaJ13BguoJWCF9_Lg3AgO4Wh-Bqa556JE20oKsYbl6RA"
) )
) )
request.session.save() session.save()
stage_view = AuthenticatorValidateStageView( response = self.client.post(
FlowExecutorView(flow=flow, current_stage=stage), request=request reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
) data={
request.META["SERVER_NAME"] = "localhost" "webauthn": {
request.META["SERVER_PORT"] = "9000"
validate_challenge_webauthn(
{
"id": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU", "id": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
"rawId": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU", "rawId": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
"type": "public-key", "type": "public-key",
"assertionClientExtensions": "{}", "assertionClientExtensions": "{}",
"response": { "response": {
"clientDataJSON": ( "clientDataJSON": (
"eyJ0eXBlIjoid2ViYXV0aG4uZ2V0IiwiY2hhbGxlbmdlIjoiZzk4STUxbVF2WlhvNWx4TGZo" "eyJ0eXBlIjoid2ViYXV0aG4uZ2V0IiwiY2hhbGxlbmdlIjoiZzk4STUxbVF2WlhvNWx4T"
"ckQyemZvbGhaYkxSeUNncWtrWWFwMWp3U2FKMTNCZ3VvSldDRjlfTGczQWdPNFdoLUJxYTU1" "GZockQyemZvbGhaYkxSeUNncWtrWWFwMWp3U2FKMTNCZ3VvSldDRjlfTGczQWdPNFdoLU"
"NkpFMjBvS3NZYmw2UkEiLCJvcmlnaW4iOiJodHRwOi8vbG9jYWxob3N0OjkwMDAiLCJjcm9z" "JxYTU1NkpFMjBvS3NZYmw2UkEiLCJvcmlnaW4iOiJodHRwOi8vbG9jYWxob3N0OjkwMDA"
"c09yaWdpbiI6ZmFsc2UsIm90aGVyX2tleXNfY2FuX2JlX2FkZGVkX2hlcmUiOiJkbyBub3Qg" "iLCJjcm9zc09yaWdpbiI6ZmFsc2UsIm90aGVyX2tleXNfY2FuX2JlX2FkZGVkX2hlcmUi"
"Y29tcGFyZSBjbGllbnREYXRhSlNPTiBhZ2FpbnN0IGEgdGVtcGxhdGUuIFNlZSBodHRwczov" "OiJkbyBub3QgY29tcGFyZSBjbGllbnREYXRhSlNPTiBhZ2FpbnN0IGEgdGVtcGxhdGUuI"
"L2dvby5nbC95YWJQZXgifQ==", "FNlZSBodHRwczovL2dvby5nbC95YWJQZXgifQ==",
), ),
"signature": ( "signature": (
"MEQCIFNlrHf9ablJAalXLWkrqvHB8oIu8kwvRpH3X3rbJVpI" "MEQCIFNlrHf9ablJAalXLWkrqvHB8oIu8kwvRpH3X3rbJVpI"
@ -244,9 +252,88 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
"userHandle": None, "userHandle": None,
}, },
}, },
stage_view, },
self.user, SERVER_NAME="localhost",
SERVER_PORT="9000",
) )
self.assertEqual(response.status_code, 302)
response = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
)
self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
def test_validate_challenge_userless(self):
"""Test webauthn"""
device = WebAuthnDevice.objects.create(
user=self.user,
public_key=(
"pQECAyYgASFYIGsBLkklToCQkT7qJT_bJYN1sEc1oJdbnmoOc43i0J"
"H6IlggLTXytuhzFVYYAK4PQNj8_coGrbbzSfUxdiPAcZTQCyU"
),
credential_id="QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
sign_count=4,
rp_id=generate_id(),
)
flow = create_test_flow()
stage = AuthenticatorValidateStage.objects.create(
name=generate_id(),
not_configured_action=NotConfiguredAction.CONFIGURE,
device_classes=[DeviceClasses.WEBAUTHN],
)
session = self.client.session
plan = FlowPlan(flow_pk=flow.pk.hex)
plan.append_stage(stage)
plan.append_stage(UserLoginStage(name=generate_id()))
session[SESSION_KEY_PLAN] = plan
session[SESSION_KEY_DEVICE_CHALLENGES] = [
{
"device_class": device.__class__.__name__.lower().replace("device", ""),
"device_uid": device.pk,
"challenge": {},
}
]
session[SESSION_KEY_WEBAUTHN_CHALLENGE] = base64url_to_bytes(
(
"g98I51mQvZXo5lxLfhrD2zfolhZbLRyCgqkkYap1"
"jwSaJ13BguoJWCF9_Lg3AgO4Wh-Bqa556JE20oKsYbl6RA"
)
)
session.save()
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
data={
"webauthn": {
"id": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
"rawId": "QKZ97ASJAOIDyipAs6mKUxDUZgDrWrbAsUb5leL7-oU",
"type": "public-key",
"assertionClientExtensions": "{}",
"response": {
"clientDataJSON": (
"eyJ0eXBlIjoid2ViYXV0aG4uZ2V0IiwiY2hhbGxlbmdlIjoiZzk4STUxbVF2WlhvNWx4T"
"GZockQyemZvbGhaYkxSeUNncWtrWWFwMWp3U2FKMTNCZ3VvSldDRjlfTGczQWdPNFdoLU"
"JxYTU1NkpFMjBvS3NZYmw2UkEiLCJvcmlnaW4iOiJodHRwOi8vbG9jYWxob3N0OjkwMDA"
"iLCJjcm9zc09yaWdpbiI6ZmFsc2UsIm90aGVyX2tleXNfY2FuX2JlX2FkZGVkX2hlcmUi"
"OiJkbyBub3QgY29tcGFyZSBjbGllbnREYXRhSlNPTiBhZ2FpbnN0IGEgdGVtcGxhdGUuI"
"FNlZSBodHRwczovL2dvby5nbC95YWJQZXgifQ==",
),
"signature": (
"MEQCIFNlrHf9ablJAalXLWkrqvHB8oIu8kwvRpH3X3rbJVpI"
"AiAqtOK6mIZPk62kZN0OzFsHfuvu_RlOl7zlqSNzDdz_Ag=="
),
"authenticatorData": "SZYN5YgOjGh0NBcPZHZgW4_krrmihjLHmVzzuoMdl2MFAAAABQ==",
"userHandle": None,
},
},
},
SERVER_NAME="localhost",
SERVER_PORT="9000",
)
self.assertEqual(response.status_code, 302)
response = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
)
self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
def test_validate_challenge_invalid(self): def test_validate_challenge_invalid(self):
"""Test webauthn""" """Test webauthn"""