stages/invitation: accept token from prompt_data
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
parent
6e625f7400
commit
d7631e8af0
|
@ -1,4 +1,6 @@
|
||||||
"""invitation stage logic"""
|
"""invitation stage logic"""
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from django.http import HttpRequest, HttpResponse
|
from django.http import HttpRequest, HttpResponse
|
||||||
from django.shortcuts import get_object_or_404
|
from django.shortcuts import get_object_or_404
|
||||||
|
|
||||||
|
@ -15,16 +17,26 @@ INVITATION_IN_EFFECT = "invitation_in_effect"
|
||||||
class InvitationStageView(StageView):
|
class InvitationStageView(StageView):
|
||||||
"""Finalise Authentication flow by logging the user in"""
|
"""Finalise Authentication flow by logging the user in"""
|
||||||
|
|
||||||
|
def get_token(self) -> Optional[str]:
|
||||||
|
"""Get token from saved get-arguments or prompt_data"""
|
||||||
|
if INVITATION_TOKEN_KEY in self.request.session.get(SESSION_KEY_GET, {}):
|
||||||
|
return self.request.session[SESSION_KEY_GET][INVITATION_TOKEN_KEY]
|
||||||
|
if INVITATION_TOKEN_KEY in self.executor.plan.context.get(
|
||||||
|
PLAN_CONTEXT_PROMPT, {}
|
||||||
|
):
|
||||||
|
return self.executor.plan.context[PLAN_CONTEXT_PROMPT][INVITATION_TOKEN_KEY]
|
||||||
|
return None
|
||||||
|
|
||||||
def get(self, request: HttpRequest) -> HttpResponse:
|
def get(self, request: HttpRequest) -> HttpResponse:
|
||||||
"""Apply data to the current flow based on a URL"""
|
"""Apply data to the current flow based on a URL"""
|
||||||
stage: InvitationStage = self.executor.current_stage
|
stage: InvitationStage = self.executor.current_stage
|
||||||
if INVITATION_TOKEN_KEY not in request.session.get(SESSION_KEY_GET, {}):
|
token = self.get_token()
|
||||||
|
if not token:
|
||||||
# No Invitation was given, raise error or continue
|
# No Invitation was given, raise error or continue
|
||||||
if stage.continue_flow_without_invitation:
|
if stage.continue_flow_without_invitation:
|
||||||
return self.executor.stage_ok()
|
return self.executor.stage_ok()
|
||||||
return self.executor.stage_invalid()
|
return self.executor.stage_invalid()
|
||||||
|
|
||||||
token = request.session[SESSION_KEY_GET][INVITATION_TOKEN_KEY]
|
|
||||||
invite: Invitation = get_object_or_404(Invitation, pk=token)
|
invite: Invitation = get_object_or_404(Invitation, pk=token)
|
||||||
self.executor.plan.context[PLAN_CONTEXT_PROMPT] = invite.fixed_data
|
self.executor.plan.context[PLAN_CONTEXT_PROMPT] = invite.fixed_data
|
||||||
self.executor.plan.context[INVITATION_IN_EFFECT] = True
|
self.executor.plan.context[INVITATION_IN_EFFECT] = True
|
||||||
|
|
|
@ -95,15 +95,11 @@ class TestUserLoginStage(TestCase):
|
||||||
self.stage.continue_flow_without_invitation = False
|
self.stage.continue_flow_without_invitation = False
|
||||||
self.stage.save()
|
self.stage.save()
|
||||||
|
|
||||||
def test_with_invitation(self):
|
def test_with_invitation_get(self):
|
||||||
"""Test with invitation, check data in session"""
|
"""Test with invitation, check data in session"""
|
||||||
plan = FlowPlan(
|
plan = FlowPlan(
|
||||||
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
|
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
|
||||||
)
|
)
|
||||||
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
|
|
||||||
plan.context[
|
|
||||||
PLAN_CONTEXT_AUTHENTICATION_BACKEND
|
|
||||||
] = "django.contrib.auth.backends.ModelBackend"
|
|
||||||
session = self.client.session
|
session = self.client.session
|
||||||
session[SESSION_KEY_PLAN] = plan
|
session[SESSION_KEY_PLAN] = plan
|
||||||
session.save()
|
session.save()
|
||||||
|
@ -130,6 +126,37 @@ class TestUserLoginStage(TestCase):
|
||||||
{"to": reverse("authentik_core:root-redirect"), "type": "redirect"},
|
{"to": reverse("authentik_core:root-redirect"), "type": "redirect"},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_with_invitation_prompt_data(self):
|
||||||
|
"""Test with invitation, check data in session"""
|
||||||
|
data = {"foo": "bar"}
|
||||||
|
invite = Invitation.objects.create(
|
||||||
|
created_by=get_anonymous_user(), fixed_data=data
|
||||||
|
)
|
||||||
|
|
||||||
|
plan = FlowPlan(
|
||||||
|
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
|
||||||
|
)
|
||||||
|
plan.context[PLAN_CONTEXT_PROMPT] = {INVITATION_TOKEN_KEY: invite.pk.hex}
|
||||||
|
session = self.client.session
|
||||||
|
session[SESSION_KEY_PLAN] = plan
|
||||||
|
session.save()
|
||||||
|
|
||||||
|
with patch("authentik.flows.views.FlowExecutorView.cancel", MagicMock()):
|
||||||
|
base_url = reverse(
|
||||||
|
"authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}
|
||||||
|
)
|
||||||
|
response = self.client.get(base_url)
|
||||||
|
|
||||||
|
session = self.client.session
|
||||||
|
plan: FlowPlan = session[SESSION_KEY_PLAN]
|
||||||
|
self.assertEqual(plan.context[PLAN_CONTEXT_PROMPT], data)
|
||||||
|
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
self.assertJSONEqual(
|
||||||
|
force_str(response.content),
|
||||||
|
{"to": reverse("authentik_core:root-redirect"), "type": "redirect"},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestInvitationsAPI(APITestCase):
|
class TestInvitationsAPI(APITestCase):
|
||||||
"""Test Invitations API"""
|
"""Test Invitations API"""
|
||||||
|
|
Reference in New Issue