diff --git a/authentik/providers/oauth2/tests/test_authorize.py b/authentik/providers/oauth2/tests/test_authorize.py index a80ba4adc..6655250d2 100644 --- a/authentik/providers/oauth2/tests/test_authorize.py +++ b/authentik/providers/oauth2/tests/test_authorize.py @@ -5,7 +5,6 @@ from django.urls import reverse from authentik.core.models import Application from authentik.core.tests.utils import create_test_admin_user, create_test_cert, create_test_flow from authentik.flows.challenge import ChallengeTypes -from authentik.flows.models import Flow from authentik.lib.generators import generate_id, generate_key from authentik.providers.oauth2.errors import AuthorizeError, ClientIdError, RedirectUriError from authentik.providers.oauth2.models import ( @@ -178,7 +177,7 @@ class TestAuthorize(OAuthTestCase): def test_full_code(self): """Test full authorization""" - flow = Flow.objects.create(slug="empty") + flow = create_test_flow() provider = OAuth2Provider.objects.create( name="test", client_id="test", @@ -214,7 +213,7 @@ class TestAuthorize(OAuthTestCase): def test_full_implicit(self): """Test full authorization""" - flow = Flow.objects.create(slug="empty") + flow = create_test_flow() provider = OAuth2Provider.objects.create( name="test", client_id="test", @@ -255,3 +254,51 @@ class TestAuthorize(OAuthTestCase): }, ) self.validate_jwt(token, provider) + + def test_full_form_post(self): + """Test full authorization (form_post response)""" + flow = create_test_flow() + provider = OAuth2Provider.objects.create( + name="test", + client_id="test", + client_secret=generate_key(), + authorization_flow=flow, + redirect_uris="http://localhost", + signing_key=create_test_cert(), + ) + Application.objects.create(name="app", slug="app", provider=provider) + state = generate_id() + user = create_test_admin_user() + self.client.force_login(user) + # Step 1, initiate params and get redirect to flow + self.client.get( + reverse("authentik_providers_oauth2:authorize"), + data={ + "response_type": "id_token", + "response_mode": "form_post", + "client_id": "test", + "state": state, + "scope": "openid", + "redirect_uri": "http://localhost", + }, + ) + response = self.client.get( + reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), + ) + token: RefreshToken = RefreshToken.objects.filter(user=user).first() + self.assertJSONEqual( + response.content.decode(), + { + "component": "ak-stage-autosubmit", + "type": ChallengeTypes.NATIVE.value, + "url": "http://localhost", + "attrs": { + "access_token": token.access_token, + "id_token": provider.encode(token.id_token.to_dict()), + "token_type": "bearer", + "expires_in": "60", + "state": state, + }, + }, + ) + self.validate_jwt(token, provider) diff --git a/authentik/providers/oauth2/views/authorize.py b/authentik/providers/oauth2/views/authorize.py index eeda58dd5..0e6247a36 100644 --- a/authentik/providers/oauth2/views/authorize.py +++ b/authentik/providers/oauth2/views/authorize.py @@ -264,7 +264,14 @@ class OAuthFulfillmentStage(StageView): parsed = urlparse(uri) if self.params.response_mode == ResponseMode.FORM_POST: - query_params = parse_qs(parsed.query) + # parse_qs returns a dictionary with values wrapped in lists, however + # we need a flat dictionary for the autosubmit challenge + + # this picks the first item in the list if the value is a list, + # otherwise just the value as-is + query_params = dict( + (k, v[0] if isinstance(v, list) else v) for k, v in parse_qs(parsed.query).items() + ) challenge = AutosubmitChallenge( data={