stages/invitation: accept token from prompt_data

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
Jens Langhammer 2021-04-28 22:20:54 +02:00
parent 07b9923bf6
commit 2c70301f56
2 changed files with 46 additions and 7 deletions

View file

@ -1,4 +1,6 @@
"""invitation stage logic"""
from typing import Optional
from django.http import HttpRequest, HttpResponse
from django.shortcuts import get_object_or_404
@ -15,16 +17,26 @@ INVITATION_IN_EFFECT = "invitation_in_effect"
class InvitationStageView(StageView):
"""Finalise Authentication flow by logging the user in"""
def get_token(self) -> Optional[str]:
"""Get token from saved get-arguments or prompt_data"""
if INVITATION_TOKEN_KEY in self.request.session.get(SESSION_KEY_GET, {}):
return self.request.session[SESSION_KEY_GET][INVITATION_TOKEN_KEY]
if INVITATION_TOKEN_KEY in self.executor.plan.context.get(
PLAN_CONTEXT_PROMPT, {}
):
return self.executor.plan.context[PLAN_CONTEXT_PROMPT][INVITATION_TOKEN_KEY]
return None
def get(self, request: HttpRequest) -> HttpResponse:
"""Apply data to the current flow based on a URL"""
stage: InvitationStage = self.executor.current_stage
if INVITATION_TOKEN_KEY not in request.session.get(SESSION_KEY_GET, {}):
token = self.get_token()
if not token:
# No Invitation was given, raise error or continue
if stage.continue_flow_without_invitation:
return self.executor.stage_ok()
return self.executor.stage_invalid()
token = request.session[SESSION_KEY_GET][INVITATION_TOKEN_KEY]
invite: Invitation = get_object_or_404(Invitation, pk=token)
self.executor.plan.context[PLAN_CONTEXT_PROMPT] = invite.fixed_data
self.executor.plan.context[INVITATION_IN_EFFECT] = True

View file

@ -95,15 +95,11 @@ class TestUserLoginStage(TestCase):
self.stage.continue_flow_without_invitation = False
self.stage.save()
def test_with_invitation(self):
def test_with_invitation_get(self):
"""Test with invitation, check data in session"""
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
plan.context[
PLAN_CONTEXT_AUTHENTICATION_BACKEND
] = "django.contrib.auth.backends.ModelBackend"
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
@ -130,6 +126,37 @@ class TestUserLoginStage(TestCase):
{"to": reverse("authentik_core:root-redirect"), "type": "redirect"},
)
def test_with_invitation_prompt_data(self):
"""Test with invitation, check data in session"""
data = {"foo": "bar"}
invite = Invitation.objects.create(
created_by=get_anonymous_user(), fixed_data=data
)
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
plan.context[PLAN_CONTEXT_PROMPT] = {INVITATION_TOKEN_KEY: invite.pk.hex}
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
with patch("authentik.flows.views.FlowExecutorView.cancel", MagicMock()):
base_url = reverse(
"authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}
)
response = self.client.get(base_url)
session = self.client.session
plan: FlowPlan = session[SESSION_KEY_PLAN]
self.assertEqual(plan.context[PLAN_CONTEXT_PROMPT], data)
self.assertEqual(response.status_code, 200)
self.assertJSONEqual(
force_str(response.content),
{"to": reverse("authentik_core:root-redirect"), "type": "redirect"},
)
class TestInvitationsAPI(APITestCase):
"""Test Invitations API"""