diff --git a/authentik/flows/challenge.py b/authentik/flows/challenge.py index 2a9237234..051978be9 100644 --- a/authentik/flows/challenge.py +++ b/authentik/flows/challenge.py @@ -72,7 +72,7 @@ class WithUserInfoChallenge(Challenge): pending_user_avatar = CharField() -class AccessDeniedChallenge(Challenge): +class AccessDeniedChallenge(WithUserInfoChallenge): """Challenge when a flow's active stage calls `stage_invalid()`.""" error_message = CharField(required=False) diff --git a/authentik/flows/stage.py b/authentik/flows/stage.py index 729f9a07a..2c7c7e5a7 100644 --- a/authentik/flows/stage.py +++ b/authentik/flows/stage.py @@ -1,4 +1,6 @@ """authentik stage Base view""" +from typing import TYPE_CHECKING, Optional + from django.contrib.auth.models import AnonymousUser from django.http import HttpRequest from django.http.request import QueryDict @@ -11,15 +13,19 @@ from structlog.stdlib import get_logger from authentik.core.models import DEFAULT_AVATAR, User from authentik.flows.challenge import ( + AccessDeniedChallenge, Challenge, ChallengeResponse, + ChallengeTypes, ContextualFlowInfo, HttpChallengeResponse, WithUserInfoChallenge, ) from authentik.flows.models import InvalidResponseAction from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_PENDING_USER -from authentik.flows.views.executor import FlowExecutorView + +if TYPE_CHECKING: + from authentik.flows.views.executor import FlowExecutorView PLAN_CONTEXT_PENDING_USER_IDENTIFIER = "pending_user_identifier" LOGGER = get_logger() @@ -28,11 +34,11 @@ LOGGER = get_logger() class StageView(View): """Abstract Stage, inherits TemplateView but can be combined with FormView""" - executor: FlowExecutorView + executor: "FlowExecutorView" request: HttpRequest = None - def __init__(self, executor: FlowExecutorView, **kwargs): + def __init__(self, executor: "FlowExecutorView", **kwargs): self.executor = executor super().__init__(**kwargs) @@ -43,6 +49,8 @@ class StageView(View): other things besides the form display. If no user is pending, returns request.user""" + if not self.executor.plan: + return self.request.user if PLAN_CONTEXT_PENDING_USER_IDENTIFIER in self.executor.plan.context and for_display: return User( username=self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER), @@ -108,6 +116,8 @@ class ChallengeStageView(StageView): def format_title(self) -> str: """Allow usage of placeholder in flow title.""" + if not self.executor.plan: + return self.executor.flow.title return self.executor.flow.title % { "app": self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION, "") } @@ -169,3 +179,27 @@ class ChallengeStageView(StageView): stage_view=self, ) return HttpChallengeResponse(challenge_response) + + +class AccessDeniedChallengeView(ChallengeStageView): + """Used internally by FlowExecutor's stage_invalid()""" + + error_message: Optional[str] + + def __init__(self, executor: "FlowExecutorView", error_message: Optional[str] = None, **kwargs): + super().__init__(executor, **kwargs) + self.error_message = error_message + + def get_challenge(self, *args, **kwargs) -> Challenge: + return AccessDeniedChallenge( + data={ + "error_message": self.error_message or "Unknown error", + "type": ChallengeTypes.NATIVE.value, + "component": "ak-stage-access-denied", + } + ) + + # This can never be reached since this challenge is created on demand and only the + # .get() method is called + def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: # pragma: no cover + return self.executor.cancel() diff --git a/authentik/flows/views/executor.py b/authentik/flows/views/executor.py index d0b3313ce..d41573876 100644 --- a/authentik/flows/views/executor.py +++ b/authentik/flows/views/executor.py @@ -10,7 +10,6 @@ from django.http import Http404, HttpRequest, HttpResponse, HttpResponseRedirect from django.http.request import QueryDict from django.shortcuts import get_object_or_404, redirect from django.template.response import TemplateResponse -from django.urls.base import reverse from django.utils.decorators import method_decorator from django.views.decorators.clickjacking import xframe_options_sameorigin from django.views.generic import View @@ -26,7 +25,6 @@ from structlog.stdlib import BoundLogger, get_logger from authentik.core.models import USER_ATTRIBUTE_DEBUG from authentik.events.models import Event, EventAction, cleanse_dict from authentik.flows.challenge import ( - AccessDeniedChallenge, Challenge, ChallengeResponse, ChallengeTypes, @@ -51,6 +49,7 @@ from authentik.flows.planner import ( FlowPlan, FlowPlanner, ) +from authentik.flows.stage import AccessDeniedChallengeView from authentik.lib.sentry import SentryIgnoredException from authentik.lib.utils.errors import exception_to_string from authentik.lib.utils.reflection import all_subclasses, class_to_path @@ -406,21 +405,9 @@ class FlowExecutorView(APIView): is a superuser.""" self._logger.debug("f(exec): Stage invalid") self.cancel() - response = HttpChallengeResponse( - AccessDeniedChallenge( - { - "error_message": error_message, - "type": ChallengeTypes.NATIVE.value, - "component": "ak-stage-access-denied", - "flow_info": { - "title": self.flow.title, - "background": self.flow.background_url, - "cancel_url": reverse("authentik_flows:cancel"), - }, - } - ) - ) - return to_stage_response(self.request, response) + challenge_view = AccessDeniedChallengeView(self, error_message) + challenge_view.request = self.request + return to_stage_response(self.request, challenge_view.get(self.request)) def cancel(self): """Cancel current execution and return a redirect"""